1 // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
2 // ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
3 // BSD license, available at the top of concurrentqueue.h.
4 // Uses Jeff Preshing's semaphore implementation (under the terms of its
5 // separate zlib license, embedded below).
6
7 #pragma once
8
9 #include "concurrentqueue.h"
10 #include <type_traits>
11 #include <cerrno>
12 #include <memory>
13 #include <chrono>
14 #include <ctime>
15
16 #if defined(_WIN32)
17 // Avoid including windows.h in a header; we only need a handful of
18 // items, so we'll redeclare them here (this is relatively safe since
19 // the API generally has to remain stable between Windows versions).
20 // I know this is an ugly hack but it still beats polluting the global
21 // namespace with thousands of generic names or adding a .cpp for nothing.
22 extern "C" {
23 struct _SECURITY_ATTRIBUTES;
24 __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
25 __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
26 __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
27 __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
28 }
29 #elif defined(__MACH__)
30 #include <mach/mach.h>
31 #elif defined(__unix__)
32 #include <semaphore.h>
33 #endif
34
35 namespace moodycamel
36 {
37 namespace details
38 {
39 // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
40 // portable + lightweight semaphore implementations, originally from
41 // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
42 // LICENSE:
43 // Copyright (c) 2015 Jeff Preshing
44 //
45 // This software is provided 'as-is', without any express or implied
46 // warranty. In no event will the authors be held liable for any damages
47 // arising from the use of this software.
48 //
49 // Permission is granted to anyone to use this software for any purpose,
50 // including commercial applications, and to alter it and redistribute it
51 // freely, subject to the following restrictions:
52 //
53 // 1. The origin of this software must not be misrepresented; you must not
54 // claim that you wrote the original software. If you use this software
55 // in a product, an acknowledgement in the product documentation would be
56 // appreciated but is not required.
57 // 2. Altered source versions must be plainly marked as such, and must not be
58 // misrepresented as being the original software.
59 // 3. This notice may not be removed or altered from any source distribution.
60 namespace mpmc_sema
61 {
62 #if defined(_WIN32)
63 class Semaphore
64 {
65 private:
66 void* m_hSema;
67
68 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
69 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
70
71 public:
72 Semaphore(int initialCount = 0)
73 {
74 assert(initialCount >= 0);
75 const long maxLong = 0x7fffffff;
76 m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
77 }
78
~Semaphore()79 ~Semaphore()
80 {
81 CloseHandle(m_hSema);
82 }
83
wait()84 void wait()
85 {
86 const unsigned long infinite = 0xffffffff;
87 WaitForSingleObject(m_hSema, infinite);
88 }
89
try_wait()90 bool try_wait()
91 {
92 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
93 return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
94 }
95
timed_wait(std::uint64_t usecs)96 bool timed_wait(std::uint64_t usecs)
97 {
98 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
99 return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
100 }
101
102 void signal(int count = 1)
103 {
104 ReleaseSemaphore(m_hSema, count, nullptr);
105 }
106 };
107 #elif defined(__MACH__)
108 //---------------------------------------------------------
109 // Semaphore (Apple iOS and OSX)
110 // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
111 //---------------------------------------------------------
112 class Semaphore
113 {
114 private:
115 semaphore_t m_sema;
116
117 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
118 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
119
120 public:
121 Semaphore(int initialCount = 0)
122 {
123 assert(initialCount >= 0);
124 semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
125 }
126
127 ~Semaphore()
128 {
129 semaphore_destroy(mach_task_self(), m_sema);
130 }
131
132 void wait()
133 {
134 semaphore_wait(m_sema);
135 }
136
137 bool try_wait()
138 {
139 return timed_wait(0);
140 }
141
142 bool timed_wait(std::uint64_t timeout_usecs)
143 {
144 mach_timespec_t ts;
145 ts.tv_sec = timeout_usecs / 1000000;
146 ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
147
148 // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
149 kern_return_t rc = semaphore_timedwait(m_sema, ts);
150
151 return rc != KERN_OPERATION_TIMED_OUT;
152 }
153
154 void signal()
155 {
156 semaphore_signal(m_sema);
157 }
158
159 void signal(int count)
160 {
161 while (count-- > 0)
162 {
163 semaphore_signal(m_sema);
164 }
165 }
166 };
167 #elif defined(__unix__)
168 //---------------------------------------------------------
169 // Semaphore (POSIX, Linux)
170 //---------------------------------------------------------
171 class Semaphore
172 {
173 private:
174 sem_t m_sema;
175
176 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
177 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
178
179 public:
180 Semaphore(int initialCount = 0)
181 {
182 assert(initialCount >= 0);
183 sem_init(&m_sema, 0, initialCount);
184 }
185
186 ~Semaphore()
187 {
188 sem_destroy(&m_sema);
189 }
190
191 void wait()
192 {
193 // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
194 int rc;
195 do {
196 rc = sem_wait(&m_sema);
197 } while (rc == -1 && errno == EINTR);
198 }
199
200 bool try_wait()
201 {
202 int rc;
203 do {
204 rc = sem_trywait(&m_sema);
205 } while (rc == -1 && errno == EINTR);
206 return !(rc == -1 && errno == EAGAIN);
207 }
208
209 bool timed_wait(std::uint64_t usecs)
210 {
211 struct timespec ts;
212 const int usecs_in_1_sec = 1000000;
213 const int nsecs_in_1_sec = 1000000000;
214 clock_gettime(CLOCK_REALTIME, &ts);
215 ts.tv_sec += usecs / usecs_in_1_sec;
216 ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
217 // sem_timedwait bombs if you have more than 1e9 in tv_nsec
218 // so we have to clean things up before passing it in
219 if (ts.tv_nsec > nsecs_in_1_sec) {
220 ts.tv_nsec -= nsecs_in_1_sec;
221 ++ts.tv_sec;
222 }
223
224 int rc;
225 do {
226 rc = sem_timedwait(&m_sema, &ts);
227 } while (rc == -1 && errno == EINTR);
228 return !(rc == -1 && errno == ETIMEDOUT);
229 }
230
231 void signal()
232 {
233 sem_post(&m_sema);
234 }
235
236 void signal(int count)
237 {
238 while (count-- > 0)
239 {
240 sem_post(&m_sema);
241 }
242 }
243 };
244 #else
245 #error Unsupported platform! (No semaphore wrapper available)
246 #endif
247
248 //---------------------------------------------------------
249 // LightweightSemaphore
250 //---------------------------------------------------------
251 class LightweightSemaphore
252 {
253 public:
254 typedef std::make_signed<std::size_t>::type ssize_t;
255
256 private:
257 std::atomic<ssize_t> m_count;
258 Semaphore m_sema;
259
260 bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
261 {
262 ssize_t oldCount;
263 // Is there a better way to set the initial spin count?
264 // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
265 // as threads start hitting the kernel semaphore.
266 int spin = 10000;
267 while (--spin >= 0)
268 {
269 oldCount = m_count.load(std::memory_order_relaxed);
270 if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
271 return true;
272 std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
273 }
274 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
275 if (oldCount > 0)
276 return true;
277 if (timeout_usecs < 0)
278 {
279 m_sema.wait();
280 return true;
281 }
282 if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
283 return true;
284 // At this point, we've timed out waiting for the semaphore, but the
285 // count is still decremented indicating we may still be waiting on
286 // it. So we have to re-adjust the count, but only if the semaphore
287 // wasn't signaled enough times for us too since then. If it was, we
288 // need to release the semaphore too.
289 while (true)
290 {
291 oldCount = m_count.load(std::memory_order_acquire);
292 if (oldCount >= 0 && m_sema.try_wait())
293 return true;
294 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed))
295 return false;
296 }
297 }
298
299 ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
300 {
301 assert(max > 0);
302 ssize_t oldCount;
303 int spin = 10000;
304 while (--spin >= 0)
305 {
306 oldCount = m_count.load(std::memory_order_relaxed);
307 if (oldCount > 0)
308 {
309 ssize_t newCount = oldCount > max ? oldCount - max : 0;
310 if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
311 return oldCount - newCount;
312 }
313 std::atomic_signal_fence(std::memory_order_acquire);
314 }
315 oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
316 if (oldCount <= 0)
317 {
318 if (timeout_usecs < 0)
319 m_sema.wait();
320 else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs))
321 {
322 while (true)
323 {
324 oldCount = m_count.load(std::memory_order_acquire);
325 if (oldCount >= 0 && m_sema.try_wait())
326 break;
327 if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed))
328 return 0;
329 }
330 }
331 }
332 if (max > 1)
333 return 1 + tryWaitMany(max - 1);
334 return 1;
335 }
336
337 public:
m_count(initialCount)338 LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
339 {
340 assert(initialCount >= 0);
341 }
342
tryWait()343 bool tryWait()
344 {
345 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
346 while (oldCount > 0)
347 {
348 if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
349 return true;
350 }
351 return false;
352 }
353
wait()354 void wait()
355 {
356 if (!tryWait())
357 waitWithPartialSpinning();
358 }
359
wait(std::int64_t timeout_usecs)360 bool wait(std::int64_t timeout_usecs)
361 {
362 return tryWait() || waitWithPartialSpinning(timeout_usecs);
363 }
364
365 // Acquires between 0 and (greedily) max, inclusive
tryWaitMany(ssize_t max)366 ssize_t tryWaitMany(ssize_t max)
367 {
368 assert(max >= 0);
369 ssize_t oldCount = m_count.load(std::memory_order_relaxed);
370 while (oldCount > 0)
371 {
372 ssize_t newCount = oldCount > max ? oldCount - max : 0;
373 if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
374 return oldCount - newCount;
375 }
376 return 0;
377 }
378
379 // Acquires at least one, and (greedily) at most max
waitMany(ssize_t max,std::int64_t timeout_usecs)380 ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
381 {
382 assert(max >= 0);
383 ssize_t result = tryWaitMany(max);
384 if (result == 0 && max > 0)
385 result = waitManyWithPartialSpinning(max, timeout_usecs);
386 return result;
387 }
388
waitMany(ssize_t max)389 ssize_t waitMany(ssize_t max)
390 {
391 ssize_t result = waitMany(max, -1);
392 assert(result > 0);
393 return result;
394 }
395
396 void signal(ssize_t count = 1)
397 {
398 assert(count >= 0);
399 ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
400 ssize_t toRelease = -oldCount < count ? -oldCount : count;
401 if (toRelease > 0)
402 {
403 m_sema.signal((int)toRelease);
404 }
405 }
406
availableApprox()407 ssize_t availableApprox() const
408 {
409 ssize_t count = m_count.load(std::memory_order_relaxed);
410 return count > 0 ? count : 0;
411 }
412 };
413 } // end namespace mpmc_sema
414 } // end namespace details
415
416
417 // This is a blocking version of the queue. It has an almost identical interface to
418 // the normal non-blocking version, with the addition of various wait_dequeue() methods
419 // and the removal of producer-specific dequeue methods.
420 template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
421 class BlockingConcurrentQueue
422 {
423 private:
424 typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
425 typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
426
427 public:
428 typedef typename ConcurrentQueue::producer_token_t producer_token_t;
429 typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
430
431 typedef typename ConcurrentQueue::index_t index_t;
432 typedef typename ConcurrentQueue::size_t size_t;
433 typedef typename std::make_signed<size_t>::type ssize_t;
434
435 static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
436 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
437 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
438 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
439 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
440 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
441 static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
442
443 public:
444 // Creates a queue with at least `capacity` element slots; note that the
445 // actual number of elements that can be inserted without additional memory
446 // allocation depends on the number of producers and the block size (e.g. if
447 // the block size is equal to `capacity`, only a single block will be allocated
448 // up-front, which means only a single producer will be able to enqueue elements
449 // without an extra allocation -- blocks aren't shared between producers).
450 // This method is not thread safe -- it is up to the user to ensure that the
451 // queue is fully constructed before it starts being used by other threads (this
452 // includes making the memory effects of construction visible, possibly with a
453 // memory barrier).
454 explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
inner(capacity)455 : inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
456 {
457 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
458 if (!sema) {
459 MOODYCAMEL_THROW(std::bad_alloc());
460 }
461 }
462
BlockingConcurrentQueue(size_t minCapacity,size_t maxExplicitProducers,size_t maxImplicitProducers)463 BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
464 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
465 {
466 assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
467 if (!sema) {
468 MOODYCAMEL_THROW(std::bad_alloc());
469 }
470 }
471
472 // Disable copying and copy assignment
473 BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
474 BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
475
476 // Moving is supported, but note that it is *not* a thread-safe operation.
477 // Nobody can use the queue while it's being moved, and the memory effects
478 // of that move must be propagated to other threads before they can use it.
479 // Note: When a queue is moved, its tokens are still valid but can only be
480 // used with the destination queue (i.e. semantically they are moved along
481 // with the queue itself).
BlockingConcurrentQueue(BlockingConcurrentQueue && other)482 BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
483 : inner(std::move(other.inner)), sema(std::move(other.sema))
484 { }
485
486 inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
487 {
488 return swap_internal(other);
489 }
490
491 // Swaps this queue's state with the other's. Not thread-safe.
492 // Swapping two queues does not invalidate their tokens, however
493 // the tokens that were created for one queue must be used with
494 // only the swapped queue (i.e. the tokens are tied to the
495 // queue's movable state, not the object itself).
swap(BlockingConcurrentQueue & other)496 inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
497 {
498 swap_internal(other);
499 }
500
501 private:
swap_internal(BlockingConcurrentQueue & other)502 BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
503 {
504 if (this == &other) {
505 return *this;
506 }
507
508 inner.swap(other.inner);
509 sema.swap(other.sema);
510 return *this;
511 }
512
513 public:
514 // Enqueues a single item (by copying it).
515 // Allocates memory if required. Only fails if memory allocation fails (or implicit
516 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
517 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
518 // Thread-safe.
enqueue(T const & item)519 inline bool enqueue(T const& item)
520 {
521 if (details::likely(inner.enqueue(item))) {
522 sema->signal();
523 return true;
524 }
525 return false;
526 }
527
528 // Enqueues a single item (by moving it, if possible).
529 // Allocates memory if required. Only fails if memory allocation fails (or implicit
530 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
531 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
532 // Thread-safe.
enqueue(T && item)533 inline bool enqueue(T&& item)
534 {
535 if (details::likely(inner.enqueue(std::move(item)))) {
536 sema->signal();
537 return true;
538 }
539 return false;
540 }
541
542 // Enqueues a single item (by copying it) using an explicit producer token.
543 // Allocates memory if required. Only fails if memory allocation fails (or
544 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
545 // Thread-safe.
enqueue(producer_token_t const & token,T const & item)546 inline bool enqueue(producer_token_t const& token, T const& item)
547 {
548 if (details::likely(inner.enqueue(token, item))) {
549 sema->signal();
550 return true;
551 }
552 return false;
553 }
554
555 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
556 // Allocates memory if required. Only fails if memory allocation fails (or
557 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
558 // Thread-safe.
enqueue(producer_token_t const & token,T && item)559 inline bool enqueue(producer_token_t const& token, T&& item)
560 {
561 if (details::likely(inner.enqueue(token, std::move(item)))) {
562 sema->signal();
563 return true;
564 }
565 return false;
566 }
567
568 // Enqueues several items.
569 // Allocates memory if required. Only fails if memory allocation fails (or
570 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
571 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
572 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
573 // Thread-safe.
574 template<typename It>
enqueue_bulk(It itemFirst,size_t count)575 inline bool enqueue_bulk(It itemFirst, size_t count)
576 {
577 if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
578 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
579 return true;
580 }
581 return false;
582 }
583
584 // Enqueues several items using an explicit producer token.
585 // Allocates memory if required. Only fails if memory allocation fails
586 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
587 // Note: Use std::make_move_iterator if the elements should be moved
588 // instead of copied.
589 // Thread-safe.
590 template<typename It>
enqueue_bulk(producer_token_t const & token,It itemFirst,size_t count)591 inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
592 {
593 if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
594 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
595 return true;
596 }
597 return false;
598 }
599
600 // Enqueues a single item (by copying it).
601 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
602 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
603 // is 0).
604 // Thread-safe.
try_enqueue(T const & item)605 inline bool try_enqueue(T const& item)
606 {
607 if (inner.try_enqueue(item)) {
608 sema->signal();
609 return true;
610 }
611 return false;
612 }
613
614 // Enqueues a single item (by moving it, if possible).
615 // Does not allocate memory (except for one-time implicit producer).
616 // Fails if not enough room to enqueue (or implicit production is
617 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
618 // Thread-safe.
try_enqueue(T && item)619 inline bool try_enqueue(T&& item)
620 {
621 if (inner.try_enqueue(std::move(item))) {
622 sema->signal();
623 return true;
624 }
625 return false;
626 }
627
628 // Enqueues a single item (by copying it) using an explicit producer token.
629 // Does not allocate memory. Fails if not enough room to enqueue.
630 // Thread-safe.
try_enqueue(producer_token_t const & token,T const & item)631 inline bool try_enqueue(producer_token_t const& token, T const& item)
632 {
633 if (inner.try_enqueue(token, item)) {
634 sema->signal();
635 return true;
636 }
637 return false;
638 }
639
640 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
641 // Does not allocate memory. Fails if not enough room to enqueue.
642 // Thread-safe.
try_enqueue(producer_token_t const & token,T && item)643 inline bool try_enqueue(producer_token_t const& token, T&& item)
644 {
645 if (inner.try_enqueue(token, std::move(item))) {
646 sema->signal();
647 return true;
648 }
649 return false;
650 }
651
652 // Enqueues several items.
653 // Does not allocate memory (except for one-time implicit producer).
654 // Fails if not enough room to enqueue (or implicit production is
655 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
656 // Note: Use std::make_move_iterator if the elements should be moved
657 // instead of copied.
658 // Thread-safe.
659 template<typename It>
try_enqueue_bulk(It itemFirst,size_t count)660 inline bool try_enqueue_bulk(It itemFirst, size_t count)
661 {
662 if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
663 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
664 return true;
665 }
666 return false;
667 }
668
669 // Enqueues several items using an explicit producer token.
670 // Does not allocate memory. Fails if not enough room to enqueue.
671 // Note: Use std::make_move_iterator if the elements should be moved
672 // instead of copied.
673 // Thread-safe.
674 template<typename It>
try_enqueue_bulk(producer_token_t const & token,It itemFirst,size_t count)675 inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
676 {
677 if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
678 sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
679 return true;
680 }
681 return false;
682 }
683
684
685 // Attempts to dequeue from the queue.
686 // Returns false if all producer streams appeared empty at the time they
687 // were checked (so, the queue is likely but not guaranteed to be empty).
688 // Never allocates. Thread-safe.
689 template<typename U>
try_dequeue(U & item)690 inline bool try_dequeue(U& item)
691 {
692 if (sema->tryWait()) {
693 while (!inner.try_dequeue(item)) {
694 continue;
695 }
696 return true;
697 }
698 return false;
699 }
700
701 // Attempts to dequeue from the queue using an explicit consumer token.
702 // Returns false if all producer streams appeared empty at the time they
703 // were checked (so, the queue is likely but not guaranteed to be empty).
704 // Never allocates. Thread-safe.
705 template<typename U>
try_dequeue(consumer_token_t & token,U & item)706 inline bool try_dequeue(consumer_token_t& token, U& item)
707 {
708 if (sema->tryWait()) {
709 while (!inner.try_dequeue(token, item)) {
710 continue;
711 }
712 return true;
713 }
714 return false;
715 }
716
717 // Attempts to dequeue several elements from the queue.
718 // Returns the number of items actually dequeued.
719 // Returns 0 if all producer streams appeared empty at the time they
720 // were checked (so, the queue is likely but not guaranteed to be empty).
721 // Never allocates. Thread-safe.
722 template<typename It>
try_dequeue_bulk(It itemFirst,size_t max)723 inline size_t try_dequeue_bulk(It itemFirst, size_t max)
724 {
725 size_t count = 0;
726 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
727 while (count != max) {
728 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
729 }
730 return count;
731 }
732
733 // Attempts to dequeue several elements from the queue using an explicit consumer token.
734 // Returns the number of items actually dequeued.
735 // Returns 0 if all producer streams appeared empty at the time they
736 // were checked (so, the queue is likely but not guaranteed to be empty).
737 // Never allocates. Thread-safe.
738 template<typename It>
try_dequeue_bulk(consumer_token_t & token,It itemFirst,size_t max)739 inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
740 {
741 size_t count = 0;
742 max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
743 while (count != max) {
744 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
745 }
746 return count;
747 }
748
749
750
751 // Blocks the current thread until there's something to dequeue, then
752 // dequeues it.
753 // Never allocates. Thread-safe.
754 template<typename U>
wait_dequeue(U & item)755 inline void wait_dequeue(U& item)
756 {
757 sema->wait();
758 while (!inner.try_dequeue(item)) {
759 continue;
760 }
761 }
762
763 // Blocks the current thread until either there's something to dequeue
764 // or the timeout (specified in microseconds) expires. Returns false
765 // without setting `item` if the timeout expires, otherwise assigns
766 // to `item` and returns true.
767 // Using a negative timeout indicates an indefinite timeout,
768 // and is thus functionally equivalent to calling wait_dequeue.
769 // Never allocates. Thread-safe.
770 template<typename U>
wait_dequeue_timed(U & item,std::int64_t timeout_usecs)771 inline bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
772 {
773 if (!sema->wait(timeout_usecs)) {
774 return false;
775 }
776 while (!inner.try_dequeue(item)) {
777 continue;
778 }
779 return true;
780 }
781
782 // Blocks the current thread until either there's something to dequeue
783 // or the timeout expires. Returns false without setting `item` if the
784 // timeout expires, otherwise assigns to `item` and returns true.
785 // Never allocates. Thread-safe.
786 template<typename U, typename Rep, typename Period>
wait_dequeue_timed(U & item,std::chrono::duration<Rep,Period> const & timeout)787 inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
788 {
789 return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
790 }
791
792 // Blocks the current thread until there's something to dequeue, then
793 // dequeues it using an explicit consumer token.
794 // Never allocates. Thread-safe.
795 template<typename U>
wait_dequeue(consumer_token_t & token,U & item)796 inline void wait_dequeue(consumer_token_t& token, U& item)
797 {
798 sema->wait();
799 while (!inner.try_dequeue(token, item)) {
800 continue;
801 }
802 }
803
804 // Blocks the current thread until either there's something to dequeue
805 // or the timeout (specified in microseconds) expires. Returns false
806 // without setting `item` if the timeout expires, otherwise assigns
807 // to `item` and returns true.
808 // Using a negative timeout indicates an indefinite timeout,
809 // and is thus functionally equivalent to calling wait_dequeue.
810 // Never allocates. Thread-safe.
811 template<typename U>
wait_dequeue_timed(consumer_token_t & token,U & item,std::int64_t timeout_usecs)812 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::int64_t timeout_usecs)
813 {
814 if (!sema->wait(timeout_usecs)) {
815 return false;
816 }
817 while (!inner.try_dequeue(token, item)) {
818 continue;
819 }
820 return true;
821 }
822
823 // Blocks the current thread until either there's something to dequeue
824 // or the timeout expires. Returns false without setting `item` if the
825 // timeout expires, otherwise assigns to `item` and returns true.
826 // Never allocates. Thread-safe.
827 template<typename U, typename Rep, typename Period>
wait_dequeue_timed(consumer_token_t & token,U & item,std::chrono::duration<Rep,Period> const & timeout)828 inline bool wait_dequeue_timed(consumer_token_t& token, U& item, std::chrono::duration<Rep, Period> const& timeout)
829 {
830 return wait_dequeue_timed(token, item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
831 }
832
833 // Attempts to dequeue several elements from the queue.
834 // Returns the number of items actually dequeued, which will
835 // always be at least one (this method blocks until the queue
836 // is non-empty) and at most max.
837 // Never allocates. Thread-safe.
838 template<typename It>
wait_dequeue_bulk(It itemFirst,size_t max)839 inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
840 {
841 size_t count = 0;
842 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
843 while (count != max) {
844 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
845 }
846 return count;
847 }
848
849 // Attempts to dequeue several elements from the queue.
850 // Returns the number of items actually dequeued, which can
851 // be 0 if the timeout expires while waiting for elements,
852 // and at most max.
853 // Using a negative timeout indicates an indefinite timeout,
854 // and is thus functionally equivalent to calling wait_dequeue_bulk.
855 // Never allocates. Thread-safe.
856 template<typename It>
wait_dequeue_bulk_timed(It itemFirst,size_t max,std::int64_t timeout_usecs)857 inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
858 {
859 size_t count = 0;
860 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
861 while (count != max) {
862 count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
863 }
864 return count;
865 }
866
867 // Attempts to dequeue several elements from the queue.
868 // Returns the number of items actually dequeued, which can
869 // be 0 if the timeout expires while waiting for elements,
870 // and at most max.
871 // Never allocates. Thread-safe.
872 template<typename It, typename Rep, typename Period>
wait_dequeue_bulk_timed(It itemFirst,size_t max,std::chrono::duration<Rep,Period> const & timeout)873 inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
874 {
875 return wait_dequeue_bulk_timed<It&>(itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
876 }
877
878 // Attempts to dequeue several elements from the queue using an explicit consumer token.
879 // Returns the number of items actually dequeued, which will
880 // always be at least one (this method blocks until the queue
881 // is non-empty) and at most max.
882 // Never allocates. Thread-safe.
883 template<typename It>
wait_dequeue_bulk(consumer_token_t & token,It itemFirst,size_t max)884 inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
885 {
886 size_t count = 0;
887 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
888 while (count != max) {
889 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
890 }
891 return count;
892 }
893
894 // Attempts to dequeue several elements from the queue using an explicit consumer token.
895 // Returns the number of items actually dequeued, which can
896 // be 0 if the timeout expires while waiting for elements,
897 // and at most max.
898 // Using a negative timeout indicates an indefinite timeout,
899 // and is thus functionally equivalent to calling wait_dequeue_bulk.
900 // Never allocates. Thread-safe.
901 template<typename It>
wait_dequeue_bulk_timed(consumer_token_t & token,It itemFirst,size_t max,std::int64_t timeout_usecs)902 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
903 {
904 size_t count = 0;
905 max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
906 while (count != max) {
907 count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
908 }
909 return count;
910 }
911
912 // Attempts to dequeue several elements from the queue using an explicit consumer token.
913 // Returns the number of items actually dequeued, which can
914 // be 0 if the timeout expires while waiting for elements,
915 // and at most max.
916 // Never allocates. Thread-safe.
917 template<typename It, typename Rep, typename Period>
wait_dequeue_bulk_timed(consumer_token_t & token,It itemFirst,size_t max,std::chrono::duration<Rep,Period> const & timeout)918 inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::chrono::duration<Rep, Period> const& timeout)
919 {
920 return wait_dequeue_bulk_timed<It&>(token, itemFirst, max, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
921 }
922
923
924 // Returns an estimate of the total number of elements currently in the queue. This
925 // estimate is only accurate if the queue has completely stabilized before it is called
926 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
927 // visible on the calling thread, and no further operations start while this method is
928 // being called).
929 // Thread-safe.
size_approx()930 inline size_t size_approx() const
931 {
932 return (size_t)sema->availableApprox();
933 }
934
935
936 // Returns true if the underlying atomic variables used by
937 // the queue are lock-free (they should be on most platforms).
938 // Thread-safe.
is_lock_free()939 static bool is_lock_free()
940 {
941 return ConcurrentQueue::is_lock_free();
942 }
943
944
945 private:
946 template<typename U>
create()947 static inline U* create()
948 {
949 auto p = (Traits::malloc)(sizeof(U));
950 return p != nullptr ? new (p) U : nullptr;
951 }
952
953 template<typename U, typename A1>
create(A1 && a1)954 static inline U* create(A1&& a1)
955 {
956 auto p = (Traits::malloc)(sizeof(U));
957 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
958 }
959
960 template<typename U>
destroy(U * p)961 static inline void destroy(U* p)
962 {
963 if (p != nullptr) {
964 p->~U();
965 }
966 (Traits::free)(p);
967 }
968
969 private:
970 ConcurrentQueue inner;
971 std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
972 };
973
974
975 template<typename T, typename Traits>
swap(BlockingConcurrentQueue<T,Traits> & a,BlockingConcurrentQueue<T,Traits> & b)976 inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
977 {
978 a.swap(b);
979 }
980
981 } // end namespace moodycamel
982