1 // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
2 // ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
3 // BSD license, available at the top of concurrentqueue.h.
4 // Uses Jeff Preshing's semaphore implementation (under the terms of its
5 // separate zlib license, embedded below).
6 
7 #pragma once
8 
9 #include "concurrentqueue.h"
10 #include <type_traits>
11 #include <cerrno>
12 #include <memory>
13 #include <chrono>
14 #include <ctime>
15 
16 #if defined(_WIN32)
17 // Avoid including windows.h in a header; we only need a handful of
18 // items, so we'll redeclare them here (this is relatively safe since
19 // the API generally has to remain stable between Windows versions).
20 // I know this is an ugly hack but it still beats polluting the global
21 // namespace with thousands of generic names or adding a .cpp for nothing.
22 extern "C" {
23 	struct _SECURITY_ATTRIBUTES;
24 	__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
25 	__declspec(dllimport) int __stdcall CloseHandle(void* hObject);
26 	__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
27 	__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
28 }
29 #elif defined(__MACH__)
30 #include <mach/mach.h>
31 #elif defined(__unix__)
32 #include <semaphore.h>
33 #endif
34 
35 namespace moodycamel
36 {
37 namespace details
38 {
39 	// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
40 	// portable + lightweight semaphore implementations, originally from
41 	// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
42 	// LICENSE:
43 	// Copyright (c) 2015 Jeff Preshing
44 	//
45 	// This software is provided 'as-is', without any express or implied
46 	// warranty. In no event will the authors be held liable for any damages
47 	// arising from the use of this software.
48 	//
49 	// Permission is granted to anyone to use this software for any purpose,
50 	// including commercial applications, and to alter it and redistribute it
51 	// freely, subject to the following restrictions:
52 	//
53 	// 1. The origin of this software must not be misrepresented; you must not
54 	//	claim that you wrote the original software. If you use this software
55 	//	in a product, an acknowledgement in the product documentation would be
56 	//	appreciated but is not required.
57 	// 2. Altered source versions must be plainly marked as such, and must not be
58 	//	misrepresented as being the original software.
59 	// 3. This notice may not be removed or altered from any source distribution.
60 	namespace mpmc_sema
61 	{
62 #if defined(_WIN32)
63 		class Semaphore
64 		{
65 		private:
66 			void* m_hSema;
67 
68 			Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
69 			Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
70 
71 		public:
72 			Semaphore(int initialCount = 0)
73 			{
74 				assert(initialCount >= 0);
75 				const long maxLong = 0x7fffffff;
76 				m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
77 			}
78 
~Semaphore()79 			~Semaphore()
80 			{
81 				CloseHandle(m_hSema);
82 			}
83 
wait()84 			void wait()
85 			{
86 				const unsigned long infinite = 0xffffffff;
87 				WaitForSingleObject(m_hSema, infinite);
88 			}
89 
try_wait()90 			bool try_wait()
91 			{
92 				const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
93 				return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
94 			}
95 
timed_wait(std::uint64_t usecs)96 			bool timed_wait(std::uint64_t usecs)
97 			{
98 				const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
99 				return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
100 			}
101 
102 			void signal(int count = 1)
103 			{
104 				ReleaseSemaphore(m_hSema, count, nullptr);
105 			}
106 		};
107 #elif defined(__MACH__)
108 		//---------------------------------------------------------
109 		// Semaphore (Apple iOS and OSX)
110 		// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
111 		//---------------------------------------------------------
112 		class Semaphore
113 		{
114 		private:
115 			semaphore_t m_sema;
116 
117 			Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
118 			Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
119 
120 		public:
121 			Semaphore(int initialCount = 0)
122 			{
123 				assert(initialCount >= 0);
124 				semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
125 			}
126 
127 			~Semaphore()
128 			{
129 				semaphore_destroy(mach_task_self(), m_sema);
130 			}
131 
132 			void wait()
133 			{
134 				semaphore_wait(m_sema);
135 			}
136 
137 			bool try_wait()
138 			{
139 				return timed_wait(0);
140 			}
141 
142 			bool timed_wait(std::uint64_t timeout_usecs)
143 			{
144 				mach_timespec_t ts;
145 				ts.tv_sec = timeout_usecs / 1000000;
146 				ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
147 
148 				// added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
149 				kern_return_t rc = semaphore_timedwait(m_sema, ts);
150 
151 				return rc != KERN_OPERATION_TIMED_OUT;
152 			}
153 
154 			void signal()
155 			{
156 				semaphore_signal(m_sema);
157 			}
158 
159 			void signal(int count)
160 			{
161 				while (count-- > 0)
162 				{
163 					semaphore_signal(m_sema);
164 				}
165 			}
166 		};
167 #elif defined(__unix__)
168 		//---------------------------------------------------------
169 		// Semaphore (POSIX, Linux)
170 		//---------------------------------------------------------
171 		class Semaphore
172 		{
173 		private:
174 			sem_t m_sema;
175 
176 			Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
177 			Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
178 
179 		public:
180 			Semaphore(int initialCount = 0)
181 			{
182 				assert(initialCount >= 0);
183 				sem_init(&m_sema, 0, initialCount);
184 			}
185 
186 			~Semaphore()
187 			{
188 				sem_destroy(&m_sema);
189 			}
190 
191 			void wait()
192 			{
193 				// http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
194 				int rc;
195 				do {
196 					rc = sem_wait(&m_sema);
197 				} while (rc == -1 && errno == EINTR);
198 			}
199 
200 			bool try_wait()
201 			{
202 				int rc;
203 				do {
204 					rc = sem_trywait(&m_sema);
205 				} while (rc == -1 && errno == EINTR);
206 				return !(rc == -1 && errno == EAGAIN);
207 			}
208 
209 			bool timed_wait(std::uint64_t usecs)
210 			{
211 				struct timespec ts;
212 				const int usecs_in_1_sec = 1000000;
213 				const int nsecs_in_1_sec = 1000000000;
214 				clock_gettime(CLOCK_REALTIME, &ts);
215 				ts.tv_sec += usecs / usecs_in_1_sec;
216 				ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
217 				// sem_timedwait bombs if you have more than 1e9 in tv_nsec
218 				// so we have to clean things up before passing it in
219 				if (ts.tv_nsec > nsecs_in_1_sec) {
220 					ts.tv_nsec -= nsecs_in_1_sec;
221 					++ts.tv_sec;
222 				}
223 
224 				int rc;
225 				do {
226 					rc = sem_timedwait(&m_sema, &ts);
227 				} while (rc == -1 && errno == EINTR);
228 				return !(rc == -1 && errno == ETIMEDOUT);
229 			}
230 
231 			void signal()
232 			{
233 				sem_post(&m_sema);
234 			}
235 
236 			void signal(int count)
237 			{
238 				while (count-- > 0)
239 				{
240 					sem_post(&m_sema);
241 				}
242 			}
243 		};
244 #else
245 #error Unsupported platform! (No semaphore wrapper available)
246 #endif
247 
248 		//---------------------------------------------------------
249 		// LightweightSemaphore
250 		//---------------------------------------------------------
251 		class LightweightSemaphore
252 		{
253 		public:
254 			typedef std::make_signed<std::size_t>::type ssize_t;
255 
256 		private:
257 			std::atomic<ssize_t> m_count;
258 			Semaphore m_sema;
259 
260 			bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
261 			{
262 				ssize_t oldCount;
263 				// Is there a better way to set the initial spin count?
264 				// If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
265 				// as threads start hitting the kernel semaphore.
266 				int spin = 10000;
267 				while (--spin >= 0)
268 				{
269 					oldCount = m_count.load(std::memory_order_relaxed);
270 					if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
271 						return true;
272 					std::atomic_signal_fence(std::memory_order_acquire);	 // Prevent the compiler from collapsing the loop.
273 				}
274 				oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
275 				if (oldCount > 0)
276 					return true;
277 				if (timeout_usecs < 0)
278 				{
279 					m_sema.wait();
280 					return true;
281 				}
282 				if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
283 					return true;
284 				// At this point, we've timed out waiting for the semaphore, but the
285 				// count is still decremented indicating we may still be waiting on
286 				// it. So we have to re-adjust the count, but only if the semaphore
287 				// wasn't signaled enough times for us too since then. If it was, we
288 				// need to release the semaphore too.
289 				while (true)
290 				{
291 					oldCount = m_count.load(std::memory_order_acquire);
292 					if (oldCount >= 0 && m_sema.try_wait())
293 						return true;
294 					if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed))
295 						return false;
296 				}
297 			}
298 
299 			ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
300 			{
301 				assert(max > 0);
302 				ssize_t oldCount;
303 				int spin = 10000;
304 				while (--spin >= 0)
305 				{
306 					oldCount = m_count.load(std::memory_order_relaxed);
307 					if (oldCount > 0)
308 					{
309 						ssize_t newCount = oldCount > max ? oldCount - max : 0;
310 						if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
311 							return oldCount - newCount;
312 					}
313 					std::atomic_signal_fence(std::memory_order_acquire);
314 				}
315 				oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
316 				if (oldCount <= 0)
317 				{
318 					if (timeout_usecs < 0)
319 						m_sema.wait();
320 					else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
321 					{
322 						while (true)
323 						{
324 							oldCount = m_count.load(std::memory_order_acquire);
325 							if (oldCount >= 0 && m_sema.try_wait())
326 								break;
327 							if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed))
328 								return 0;
329 						}
330 					}
331 				}
332 				if (max > 1)
333 					return 1 + tryWaitMany(max - 1);
334 				return 1;
335 			}
336 
337 		public:
m_count(initialCount)338 			LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
339 			{
340 				assert(initialCount >= 0);
341 			}
342 
tryWait()343 			bool tryWait()
344 			{
345 				ssize_t oldCount = m_count.load(std::memory_order_relaxed);
346 				while (oldCount > 0)
347 				{
348 					if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
349 						return true;
350 				}
351 				return false;
352 			}
353 
wait()354 			void wait()
355 			{
356 				if (!tryWait())
357 					waitWithPartialSpinning();
358 			}
359 
wait(std::int64_t timeout_usecs)360 			bool wait(std::int64_t timeout_usecs)
361 			{
362 				return tryWait() || waitWithPartialSpinning(timeout_usecs);
363 			}
364 
365 			// Acquires between 0 and (greedily) max, inclusive
tryWaitMany(ssize_t max)366 			ssize_t tryWaitMany(ssize_t max)
367 			{
368 				assert(max >= 0);
369 				ssize_t oldCount = m_count.load(std::memory_order_relaxed);
370 				while (oldCount > 0)
371 				{
372 					ssize_t newCount = oldCount > max ? oldCount - max : 0;
373 					if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
374 						return oldCount - newCount;
375 				}
376 				return 0;
377 			}
378 
379 			// Acquires at least one, and (greedily) at most max
waitMany(ssize_t max,std::int64_t timeout_usecs)380 			ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
381 			{
382 				assert(max >= 0);
383 				ssize_t result = tryWaitMany(max);
384 				if (result == 0 && max > 0)
385 					result = waitManyWithPartialSpinning(max, timeout_usecs);
386 				return result;
387 			}
388 
waitMany(ssize_t max)389 			ssize_t waitMany(ssize_t max)
390 			{
391 				ssize_t result = waitMany(max, -1);
392 				assert(result > 0);
393 				return result;
394 			}
395 
396 			void signal(ssize_t count = 1)
397 			{
398 				assert(count >= 0);
399 				ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
400 				ssize_t toRelease = -oldCount < count ? -oldCount : count;
401 				if (toRelease > 0)
402 				{
403 					m_sema.signal((int)toRelease);
404 				}
405 			}
406 
availableApprox()407 			ssize_t availableApprox() const
408 			{
409 				ssize_t count = m_count.load(std::memory_order_relaxed);
410 				return count > 0 ? count : 0;
411 			}
412 		};
413 	}	// end namespace mpmc_sema
414 }	// end namespace details
415 
416 
417 // This is a blocking version of the queue. It has an almost identical interface to
418 // the normal non-blocking version, with the addition of various wait_dequeue() methods
419 // and the removal of producer-specific dequeue methods.
420 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
421 class BlockingConcurrentQueue
422 {
423 private:
424 	typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
425 	typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
426 
427 public:
428 	typedef typename ConcurrentQueue::producer_token_t producer_token_t;
429 	typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
430 
431 	typedef typename ConcurrentQueue::index_t index_t;
432 	typedef typename ConcurrentQueue::size_t size_t;
433 	typedef typename std::make_signed<size_t>::type ssize_t;
434 
435 	static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
436 	static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
437 	static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
438 	static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
439 	static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
440 	static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
441 	static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
442 
443 public:
444 	// Creates a queue with at least `capacity` element slots; note that the
445 	// actual number of elements that can be inserted without additional memory
446 	// allocation depends on the number of producers and the block size (e.g. if
447 	// the block size is equal to `capacity`, only a single block will be allocated
448 	// up-front, which means only a single producer will be able to enqueue elements
449 	// without an extra allocation -- blocks aren't shared between producers).
450 	// This method is not thread safe -- it is up to the user to ensure that the
451 	// queue is fully constructed before it starts being used by other threads (this
452 	// includes making the memory effects of construction visible, possibly with a
453 	// memory barrier).
454 	explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
inner(capacity)455 		: inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
456 	{
457 		assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
458 		if (!sema) {
459 			MOODYCAMEL_THROW(std::bad_alloc());
460 		}
461 	}
462 
BlockingConcurrentQueue(size_t minCapacity,size_t maxExplicitProducers,size_t maxImplicitProducers)463 	BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
464 		: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
465 	{
466 		assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
467 		if (!sema) {
468 			MOODYCAMEL_THROW(std::bad_alloc());
469 		}
470 	}
471 
472 	// Disable copying and copy assignment
473 	BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
474 	BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
475 
476 	// Moving is supported, but note that it is *not* a thread-safe operation.
477 	// Nobody can use the queue while it's being moved, and the memory effects
478 	// of that move must be propagated to other threads before they can use it.
479 	// Note: When a queue is moved, its tokens are still valid but can only be
480 	// used with the destination queue (i.e. semantically they are moved along
481 	// with the queue itself).
BlockingConcurrentQueue(BlockingConcurrentQueue && other)482 	BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
483 		: inner(std::move(other.inner)), sema(std::move(other.sema))
484 	{ }
485 
486 	inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
487 	{
488 		return swap_internal(other);
489 	}
490 
491 	// Swaps this queue's state with the other's. Not thread-safe.
492 	// Swapping two queues does not invalidate their tokens, however
493 	// the tokens that were created for one queue must be used with
494 	// only the swapped queue (i.e. the tokens are tied to the
495 	// queue's movable state, not the object itself).
swap(BlockingConcurrentQueue & other)496 	inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
497 	{
498 		swap_internal(other);
499 	}
500 
501 private:
swap_internal(BlockingConcurrentQueue & other)502 	BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
503 	{
504 		if (this == &other) {
505 			return *this;
506 		}
507 
508 		inner.swap(other.inner);
509 		sema.swap(other.sema);
510 		return *this;
511 	}
512 
513 public:
514 	// Enqueues a single item (by copying it).
515 	// Allocates memory if required. Only fails if memory allocation fails (or implicit
516 	// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
517 	// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
518 	// Thread-safe.
enqueue(T const & item)519 	inline bool enqueue(T const& item)
520 	{
521 		if (details::likely(inner.enqueue(item))) {
522 			sema->signal();
523 			return true;
524 		}
525 		return false;
526 	}
527 
528 	// Enqueues a single item (by moving it, if possible).
529 	// Allocates memory if required. Only fails if memory allocation fails (or implicit
530 	// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
531 	// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
532 	// Thread-safe.
enqueue(T && item)533 	inline bool enqueue(T&& item)
534 	{
535 		if (details::likely(inner.enqueue(std::move(item)))) {
536 			sema->signal();
537 			return true;
538 		}
539 		return false;
540 	}
541 
542 	// Enqueues a single item (by copying it) using an explicit producer token.
543 	// Allocates memory if required. Only fails if memory allocation fails (or
544 	// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
545 	// Thread-safe.
enqueue(producer_token_t const & token,T const & item)546 	inline bool enqueue(producer_token_t const& token, T const& item)
547 	{
548 		if (details::likely(inner.enqueue(token, item))) {
549 			sema->signal();
550 			return true;
551 		}
552 		return false;
553 	}
554 
555 	// Enqueues a single item (by moving it, if possible) using an explicit producer token.
556 	// Allocates memory if required. Only fails if memory allocation fails (or
557 	// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
558 	// Thread-safe.
enqueue(producer_token_t const & token,T && item)559 	inline bool enqueue(producer_token_t const& token, T&& item)
560 	{
561 		if (details::likely(inner.enqueue(token, std::move(item)))) {
562 			sema->signal();
563 			return true;
564 		}
565 		return false;
566 	}
567 
568 	// Enqueues several items.
569 	// Allocates memory if required. Only fails if memory allocation fails (or
570 	// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
571 	// is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
572 	// Note: Use std::make_move_iterator if the elements should be moved instead of copied.
573 	// Thread-safe.
574 	template<typename It>
enqueue_bulk(It itemFirst,size_t count)575 	inline bool enqueue_bulk(It itemFirst, size_t count)
576 	{
577 		if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
578 			sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
579 			return true;
580 		}
581 		return false;
582 	}
583 
584 	// Enqueues several items using an explicit producer token.
585 	// Allocates memory if required. Only fails if memory allocation fails
586 	// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
587 	// Note: Use std::make_move_iterator if the elements should be moved
588 	// instead of copied.
589 	// Thread-safe.
590 	template<typename It>
enqueue_bulk(producer_token_t const & token,It itemFirst,size_t count)591 	inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
592 	{
593 		if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
594 			sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
595 			return true;
596 		}
597 		return false;
598 	}
599 
600 	// Enqueues a single item (by copying it).
601 	// Does not allocate memory. Fails if not enough room to enqueue (or implicit
602 	// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
603 	// is 0).
604 	// Thread-safe.
try_enqueue(T const & item)605 	inline bool try_enqueue(T const& item)
606 	{
607 		if (inner.try_enqueue(item)) {
608 			sema->signal();
609 			return true;
610 		}
611 		return false;
612 	}
613 
614 	// Enqueues a single item (by moving it, if possible).
615 	// Does not allocate memory (except for one-time implicit producer).
616 	// Fails if not enough room to enqueue (or implicit production is
617 	// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
618 	// Thread-safe.
try_enqueue(T && item)619 	inline bool try_enqueue(T&& item)
620 	{
621 		if (inner.try_enqueue(std::move(item))) {
622 			sema->signal();
623 			return true;
624 		}
625 		return false;
626 	}
627 
628 	// Enqueues a single item (by copying it) using an explicit producer token.
629 	// Does not allocate memory. Fails if not enough room to enqueue.
630 	// Thread-safe.
try_enqueue(producer_token_t const & token,T const & item)631 	inline bool try_enqueue(producer_token_t const& token, T const& item)
632 	{
633 		if (inner.try_enqueue(token, item)) {
634 			sema->signal();
635 			return true;
636 		}
637 		return false;
638 	}
639 
640 	// Enqueues a single item (by moving it, if possible) using an explicit producer token.
641 	// Does not allocate memory. Fails if not enough room to enqueue.
642 	// Thread-safe.
try_enqueue(producer_token_t const & token,T && item)643 	inline bool try_enqueue(producer_token_t const& token, T&& item)
644 	{
645 		if (inner.try_enqueue(token, std::move(item))) {
646 			sema->signal();
647 			return true;
648 		}
649 		return false;
650 	}
651 
652 	// Enqueues several items.
653 	// Does not allocate memory (except for one-time implicit producer).
654 	// Fails if not enough room to enqueue (or implicit production is
655 	// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
656 	// Note: Use std::make_move_iterator if the elements should be moved
657 	// instead of copied.
658 	// Thread-safe.
659 	template<typename It>
try_enqueue_bulk(It itemFirst,size_t count)660 	inline bool try_enqueue_bulk(It itemFirst, size_t count)
661 	{
662 		if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
663 			sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
664 			return true;
665 		}
666 		return false;
667 	}
668 
669 	// Enqueues several items using an explicit producer token.
670 	// Does not allocate memory. Fails if not enough room to enqueue.
671 	// Note: Use std::make_move_iterator if the elements should be moved
672 	// instead of copied.
673 	// Thread-safe.
674 	template<typename It>
try_enqueue_bulk(producer_token_t const & token,It itemFirst,size_t count)675 	inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
676 	{
677 		if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
678 			sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
679 			return true;
680 		}
681 		return false;
682 	}
683 
684 
685 	// Attempts to dequeue from the queue.
686 	// Returns false if all producer streams appeared empty at the time they
687 	// were checked (so, the queue is likely but not guaranteed to be empty).
688 	// Never allocates. Thread-safe.
689 	template<typename U>
try_dequeue(U & item)690 	inline bool try_dequeue(U& item)
691 	{
692 		if (sema->tryWait()) {
693 			while (!inner.try_dequeue(item)) {
694 				continue;
695 			}
696 			return true;
697 		}
698 		return false;
699 	}
700 
701 	// Attempts to dequeue from the queue using an explicit consumer token.
702 	// Returns false if all producer streams appeared empty at the time they
703 	// were checked (so, the queue is likely but not guaranteed to be empty).
704 	// Never allocates. Thread-safe.
705 	template<typename U>
try_dequeue(consumer_token_t & token,U & item)706 	inline bool try_dequeue(consumer_token_t& token, U& item)
707 	{
708 		if (sema->tryWait()) {
709 			while (!inner.try_dequeue(token, item)) {
710 				continue;
711 			}
712 			return true;
713 		}
714 		return false;
715 	}
716 
717 	// Attempts to dequeue several elements from the queue.
718 	// Returns the number of items actually dequeued.
719 	// Returns 0 if all producer streams appeared empty at the time they
720 	// were checked (so, the queue is likely but not guaranteed to be empty).
721 	// Never allocates. Thread-safe.
722 	template<typename It>
try_dequeue_bulk(It itemFirst,size_t max)723 	inline size_t try_dequeue_bulk(It itemFirst, size_t max)
724 	{
725 		size_t count = 0;
726 		max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
727 		while (count != max) {
728 			count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
729 		}
730 		return count;
731 	}
732 
733 	// Attempts to dequeue several elements from the queue using an explicit consumer token.
734 	// Returns the number of items actually dequeued.
735 	// Returns 0 if all producer streams appeared empty at the time they
736 	// were checked (so, the queue is likely but not guaranteed to be empty).
737 	// Never allocates. Thread-safe.
738 	template<typename It>
try_dequeue_bulk(consumer_token_t & token,It itemFirst,size_t max)739 	inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
740 	{
741 		size_t count = 0;
742 		max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
743 		while (count != max) {
744 			count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
745 		}
746 		return count;
747 	}
748 
749 
750 
751 	// Blocks the current thread until there's something to dequeue, then
752 	// dequeues it.
753 	// Never allocates. Thread-safe.
754 	template<typename U>
wait_dequeue(U & item)755 	inline void wait_dequeue(U& item)
756 	{
757 		sema->wait();
758 		while (!inner.try_dequeue(item)) {
759 			continue;
760 		}
761 	}
762 
763 	// Blocks the current thread until either there's something to dequeue
764 	// or the timeout (specified in microseconds) expires. Returns false
765 	// without setting `item` if the timeout expires, otherwise assigns
766 	// to `item` and returns true.
767 	// Using a negative timeout indicates an indefinite timeout,
768 	// and is thus functionally equivalent to calling wait_dequeue.
769 	// Never allocates. Thread-safe.
770 	template<typename U>
wait_dequeue_timed(U & item,std::int64_t timeout_usecs)771 	inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
772 	{
773 		if (!sema->wait(timeout_usecs)) {
774 			return false;
775 		}
776 		while (!inner.try_dequeue(item)) {
777 			continue;
778 		}
779 		return true;
780 	}
781 
782     // Blocks the current thread until either there's something to dequeue
783 	// or the timeout expires. Returns false without setting `item` if the
784     // timeout expires, otherwise assigns to `item` and returns true.
785 	// Never allocates. Thread-safe.
786 	template<typename U, typename Rep, typename Period>
wait_dequeue_timed(U & item,std::chrono::duration<Rep,Period> const & timeout)787 	inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
788     {
789         return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
790     }
791 
792 	// Blocks the current thread until there's something to dequeue, then
793 	// dequeues it using an explicit consumer token.
794 	// Never allocates. Thread-safe.
795 	template<typename U>
wait_dequeue(consumer_token_t & token,U & item)796 	inline void wait_dequeue(consumer_token_t& token, U& item)
797 	{
798 		sema->wait();
799 		while (!inner.try_dequeue(token, item)) {
800 			continue;
801 		}
802 	}
803 
804 	// Blocks the current thread until either there's something to dequeue
805 	// or the timeout (specified in microseconds) expires. Returns false
806 	// without setting `item` if the timeout expires, otherwise assigns
807 	// to `item` and returns true.
808 	// Using a negative timeout indicates an indefinite timeout,
809 	// and is thus functionally equivalent to calling wait_dequeue.
810 	// Never allocates. Thread-safe.
811 	template<typename U>
wait_dequeue_timed(consumer_token_t & token,U & item,std::int64_t timeout_usecs)812 	inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
813 	{
814 		if (!sema->wait(timeout_usecs)) {
815 			return false;
816 		}
817 		while (!inner.try_dequeue(token, item)) {
818 			continue;
819 		}
820 		return true;
821 	}
822 
823     // Blocks the current thread until either there's something to dequeue
824 	// or the timeout expires. Returns false without setting `item` if the
825     // timeout expires, otherwise assigns to `item` and returns true.
826 	// Never allocates. Thread-safe.
827 	template<typename U, typename Rep, typename Period>
wait_dequeue_timed(consumer_token_t & token,U & item,std::chrono::duration<Rep,Period> const & timeout)828 	inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
829     {
830         return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
831     }
832 
833 	// Attempts to dequeue several elements from the queue.
834 	// Returns the number of items actually dequeued, which will
835 	// always be at least one (this method blocks until the queue
836 	// is non-empty) and at most max.
837 	// Never allocates. Thread-safe.
838 	template<typename It>
wait_dequeue_bulk(It itemFirst,size_t max)839 	inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
840 	{
841 		size_t count = 0;
842 		max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
843 		while (count != max) {
844 			count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
845 		}
846 		return count;
847 	}
848 
849 	// Attempts to dequeue several elements from the queue.
850 	// Returns the number of items actually dequeued, which can
851 	// be 0 if the timeout expires while waiting for elements,
852 	// and at most max.
853 	// Using a negative timeout indicates an indefinite timeout,
854 	// and is thus functionally equivalent to calling wait_dequeue_bulk.
855 	// Never allocates. Thread-safe.
856 	template<typename It>
wait_dequeue_bulk_timed(It itemFirst,size_t max,std::int64_t timeout_usecs)857 	inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
858 	{
859 		size_t count = 0;
860 		max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
861 		while (count != max) {
862 			count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
863 		}
864 		return count;
865 	}
866 
867     // Attempts to dequeue several elements from the queue.
868 	// Returns the number of items actually dequeued, which can
869 	// be 0 if the timeout expires while waiting for elements,
870 	// and at most max.
871 	// Never allocates. Thread-safe.
872 	template<typename It, typename Rep, typename Period>
wait_dequeue_bulk_timed(It itemFirst,size_t max,std::chrono::duration<Rep,Period> const & timeout)873 	inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
874     {
875         return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
876     }
877 
878 	// Attempts to dequeue several elements from the queue using an explicit consumer token.
879 	// Returns the number of items actually dequeued, which will
880 	// always be at least one (this method blocks until the queue
881 	// is non-empty) and at most max.
882 	// Never allocates. Thread-safe.
883 	template<typename It>
wait_dequeue_bulk(consumer_token_t & token,It itemFirst,size_t max)884 	inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
885 	{
886 		size_t count = 0;
887 		max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
888 		while (count != max) {
889 			count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
890 		}
891 		return count;
892 	}
893 
894 	// Attempts to dequeue several elements from the queue using an explicit consumer token.
895 	// Returns the number of items actually dequeued, which can
896 	// be 0 if the timeout expires while waiting for elements,
897 	// and at most max.
898 	// Using a negative timeout indicates an indefinite timeout,
899 	// and is thus functionally equivalent to calling wait_dequeue_bulk.
900 	// Never allocates. Thread-safe.
901 	template<typename It>
wait_dequeue_bulk_timed(consumer_token_t & token,It itemFirst,size_t max,std::int64_t timeout_usecs)902 	inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
903 	{
904 		size_t count = 0;
905 		max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
906 		while (count != max) {
907 			count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
908 		}
909 		return count;
910 	}
911 
912 	// Attempts to dequeue several elements from the queue using an explicit consumer token.
913 	// Returns the number of items actually dequeued, which can
914 	// be 0 if the timeout expires while waiting for elements,
915 	// and at most max.
916 	// Never allocates. Thread-safe.
917 	template<typename It, typename Rep, typename Period>
wait_dequeue_bulk_timed(consumer_token_t & token,It itemFirst,size_t max,std::chrono::duration<Rep,Period> const & timeout)918 	inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
919     {
920         return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
921     }
922 
923 
924 	// Returns an estimate of the total number of elements currently in the queue. This
925 	// estimate is only accurate if the queue has completely stabilized before it is called
926 	// (i.e. all enqueue and dequeue operations have completed and their memory effects are
927 	// visible on the calling thread, and no further operations start while this method is
928 	// being called).
929 	// Thread-safe.
size_approx()930 	inline size_t size_approx() const
931 	{
932 		return (size_t)sema->availableApprox();
933 	}
934 
935 
936 	// Returns true if the underlying atomic variables used by
937 	// the queue are lock-free (they should be on most platforms).
938 	// Thread-safe.
is_lock_free()939 	static bool is_lock_free()
940 	{
941 		return ConcurrentQueue::is_lock_free();
942 	}
943 
944 
945 private:
946 	template<typename U>
create()947 	static inline U* create()
948 	{
949 		auto p = (Traits::malloc)(sizeof(U));
950 		return p != nullptr ? new (p) U : nullptr;
951 	}
952 
953 	template<typename U, typename A1>
create(A1 && a1)954 	static inline U* create(A1&& a1)
955 	{
956 		auto p = (Traits::malloc)(sizeof(U));
957 		return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
958 	}
959 
960 	template<typename U>
destroy(U * p)961 	static inline void destroy(U* p)
962 	{
963 		if (p != nullptr) {
964 			p->~U();
965 		}
966 		(Traits::free)(p);
967 	}
968 
969 private:
970 	ConcurrentQueue inner;
971 	std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
972 };
973 
974 
975 template<typename T, typename Traits>
swap(BlockingConcurrentQueue<T,Traits> & a,BlockingConcurrentQueue<T,Traits> & b)976 inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
977 {
978 	a.swap(b);
979 }
980 
981 }	// end namespace moodycamel
982