1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue. 2 // An overview, including benchmark results, is provided here: 3 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++ 4 // The full design is also described in excruciating detail at: 5 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue 6 7 // Simplified BSD license: 8 // Copyright (c) 2013-2016, Cameron Desrochers. 9 // All rights reserved. 10 // 11 // Redistribution and use in source and binary forms, with or without modification, 12 // are permitted provided that the following conditions are met: 13 // 14 // - Redistributions of source code must retain the above copyright notice, this list of 15 // conditions and the following disclaimer. 16 // - Redistributions in binary form must reproduce the above copyright notice, this list of 17 // conditions and the following disclaimer in the documentation and/or other materials 18 // provided with the distribution. 19 // 20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 21 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 22 // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 23 // THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT 25 // OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 26 // HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 27 // TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, 28 // EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31 #pragma once 32 33 #if defined(__GNUC__) 34 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and 35 // Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings 36 // upon assigning any computed values) 37 #pragma GCC diagnostic push 38 #pragma GCC diagnostic ignored "-Wconversion" 39 40 #ifdef MCDBGQ_USE_RELACY 41 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast" 42 #endif 43 #endif 44 45 #if defined(__APPLE__) 46 #include "TargetConditionals.h" 47 #endif 48 49 #ifdef MCDBGQ_USE_RELACY 50 #include "relacy/relacy_std.hpp" 51 #include "relacy_shims.h" 52 // We only use malloc/free anyway, and the delete macro messes up `= delete` method declarations. 53 // We'll override the default trait malloc ourselves without a macro. 54 #undef new 55 #undef delete 56 #undef malloc 57 #undef free 58 #else 59 #include <atomic> // Requires C++11. Sorry VS2010. 60 #include <cassert> 61 #endif 62 #include <cstddef> // for max_align_t 63 #include <cstdint> 64 #include <cstdlib> 65 #include <type_traits> 66 #include <algorithm> 67 #include <utility> 68 #include <limits> 69 #include <climits> // for CHAR_BIT 70 #include <array> 71 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading 72 73 // Platform-specific definitions of a numeric thread ID type and an invalid value 74 namespace moodycamel { namespace details { 75 template<typename thread_id_t> struct thread_id_converter { 76 typedef thread_id_t thread_id_numeric_size_t; 77 typedef thread_id_t thread_id_hash_t; prehashthread_id_converter78 static thread_id_hash_t prehash(thread_id_t const& x) { return x; } 79 }; 80 } } 81 #if defined(MCDBGQ_USE_RELACY) 82 namespace moodycamel { namespace details { 83 typedef std::uint32_t thread_id_t; 84 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU; 85 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU; thread_id()86 static inline thread_id_t thread_id() { return rl::thread_index(); } 87 } } 88 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__) 89 // No sense pulling in windows.h in a header, we'll manually declare the function 90 // we use and rely on backwards-compatibility for this not to break 91 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void); 92 namespace moodycamel { namespace details { 93 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows"); 94 typedef std::uint32_t thread_id_t; 95 static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx 96 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4. thread_id()97 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); } 98 } } 99 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) 100 namespace moodycamel { namespace details { 101 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes"); 102 103 typedef std::thread::id thread_id_t; 104 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID 105 106 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's 107 // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't 108 // be. thread_id()109 static inline thread_id_t thread_id() { return std::this_thread::get_id(); } 110 111 template<std::size_t> struct thread_id_size { }; 112 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; }; 113 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; }; 114 115 template<> struct thread_id_converter<thread_id_t> { 116 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t; 117 #ifndef __APPLE__ 118 typedef std::size_t thread_id_hash_t; 119 #else 120 typedef thread_id_numeric_size_t thread_id_hash_t; 121 #endif 122 123 static thread_id_hash_t prehash(thread_id_t const& x) 124 { 125 #ifndef __APPLE__ 126 return std::hash<std::thread::id>()(x); 127 #else 128 return *reinterpret_cast<thread_id_hash_t const*>(&x); 129 #endif 130 } 131 }; 132 } } 133 #else 134 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475 135 // In order to get a numeric thread ID in a platform-independent way, we use a thread-local 136 // static variable's address as a thread identifier :-) 137 #if defined(__GNUC__) || defined(__INTEL_COMPILER) 138 #define MOODYCAMEL_THREADLOCAL __thread 139 #elif defined(_MSC_VER) 140 #define MOODYCAMEL_THREADLOCAL __declspec(thread) 141 #else 142 // Assume C++11 compliant compiler 143 #define MOODYCAMEL_THREADLOCAL thread_local 144 #endif 145 namespace moodycamel { namespace details { 146 typedef std::uintptr_t thread_id_t; 147 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr 148 static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned. 149 static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); } 150 } } 151 #endif 152 153 // Exceptions 154 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED 155 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__)) 156 #define MOODYCAMEL_EXCEPTIONS_ENABLED 157 #endif 158 #endif 159 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED 160 #define MOODYCAMEL_TRY try 161 #define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__) 162 #define MOODYCAMEL_RETHROW throw 163 #define MOODYCAMEL_THROW(expr) throw (expr) 164 #else 165 #define MOODYCAMEL_TRY if (true) 166 #define MOODYCAMEL_CATCH(...) else if (false) 167 #define MOODYCAMEL_RETHROW 168 #define MOODYCAMEL_THROW(expr) 169 #endif 170 171 #ifndef MOODYCAMEL_NOEXCEPT 172 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED) 173 #define MOODYCAMEL_NOEXCEPT 174 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true 175 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true 176 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800 177 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-( 178 // We have to assume *all* non-trivial constructors may throw on VS2012! 179 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 180 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value) 181 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 182 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900 183 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT 184 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value) 185 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)) 186 #else 187 #define MOODYCAMEL_NOEXCEPT noexcept 188 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr) 189 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr) 190 #endif 191 #endif 192 193 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 194 #ifdef MCDBGQ_USE_RELACY 195 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 196 #else 197 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445 198 // g++ <=4.7 doesn't support thread_local either. 199 // Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work 200 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) 201 // Assume `thread_local` is fully supported in all other C++11 compilers/platforms 202 //#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on 203 #endif 204 #endif 205 #endif 206 207 // VS2012 doesn't support deleted functions. 208 // In this case, we declare the function normally but don't define it. A link error will be generated if the function is called. 209 #ifndef MOODYCAMEL_DELETE_FUNCTION 210 #if defined(_MSC_VER) && _MSC_VER < 1800 211 #define MOODYCAMEL_DELETE_FUNCTION 212 #else 213 #define MOODYCAMEL_DELETE_FUNCTION = delete 214 #endif 215 #endif 216 217 // Compiler-specific likely/unlikely hints 218 namespace moodycamel { namespace details { 219 #if defined(__GNUC__) 220 inline bool likely(bool x) { return __builtin_expect((x), true); } 221 inline bool unlikely(bool x) { return __builtin_expect((x), false); } 222 #else 223 inline bool likely(bool x) { return x; } 224 inline bool unlikely(bool x) { return x; } 225 #endif 226 } } 227 228 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 229 #include "internal/concurrentqueue_internal_debug.h" 230 #endif 231 232 namespace moodycamel { 233 namespace details { 234 template<typename T> 235 struct const_numeric_max { 236 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers"); 237 static const T value = std::numeric_limits<T>::is_signed 238 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1) 239 : static_cast<T>(-1); 240 }; 241 242 #if defined(__GNUC__) && !defined( __clang__ ) 243 typedef ::max_align_t max_align_t; // GCC forgot to add it to std:: for a while 244 #else 245 typedef std::max_align_t max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std:: 246 #endif 247 } 248 249 // Default traits for the ConcurrentQueue. To change some of the 250 // traits without re-implementing all of them, inherit from this 251 // struct and shadow the declarations you wish to be different; 252 // since the traits are used as a template type parameter, the 253 // shadowed declarations will be used where defined, and the defaults 254 // otherwise. 255 struct ConcurrentQueueDefaultTraits 256 { 257 // General-purpose size type. std::size_t is strongly recommended. 258 typedef std::size_t size_t; 259 260 // The type used for the enqueue and dequeue indices. Must be at least as 261 // large as size_t. Should be significantly larger than the number of elements 262 // you expect to hold at once, especially if you have a high turnover rate; 263 // for example, on 32-bit x86, if you expect to have over a hundred million 264 // elements or pump several million elements through your queue in a very 265 // short space of time, using a 32-bit type *may* trigger a race condition. 266 // A 64-bit int type is recommended in that case, and in practice will 267 // prevent a race condition no matter the usage of the queue. Note that 268 // whether the queue is lock-free with a 64-int type depends on the whether 269 // std::atomic<std::uint64_t> is lock-free, which is platform-specific. 270 typedef std::size_t index_t; 271 272 // Internally, all elements are enqueued and dequeued from multi-element 273 // blocks; this is the smallest controllable unit. If you expect few elements 274 // but many producers, a smaller block size should be favoured. For few producers 275 // and/or many elements, a larger block size is preferred. A sane default 276 // is provided. Must be a power of 2. 277 static const size_t BLOCK_SIZE = 32; 278 279 // For explicit producers (i.e. when using a producer token), the block is 280 // checked for being empty by iterating through a list of flags, one per element. 281 // For large block sizes, this is too inefficient, and switching to an atomic 282 // counter-based approach is faster. The switch is made for block sizes strictly 283 // larger than this threshold. 284 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32; 285 286 // How many full blocks can be expected for a single explicit producer? This should 287 // reflect that number's maximum for optimal performance. Must be a power of 2. 288 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32; 289 290 // How many full blocks can be expected for a single implicit producer? This should 291 // reflect that number's maximum for optimal performance. Must be a power of 2. 292 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32; 293 294 // The initial size of the hash table mapping thread IDs to implicit producers. 295 // Note that the hash is resized every time it becomes half full. 296 // Must be a power of two, and either 0 or at least 1. If 0, implicit production 297 // (using the enqueue methods without an explicit producer token) is disabled. 298 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32; 299 300 // Controls the number of items that an explicit consumer (i.e. one with a token) 301 // must consume before it causes all consumers to rotate and move on to the next 302 // internal queue. 303 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256; 304 305 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue. 306 // Enqueue operations that would cause this limit to be surpassed will fail. Note 307 // that this limit is enforced at the block level (for performance reasons), i.e. 308 // it's rounded up to the nearest block size. 309 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value; 310 311 312 #ifndef MCDBGQ_USE_RELACY 313 // Memory allocation can be customized if needed. 314 // malloc should return nullptr on failure, and handle alignment like std::malloc. 315 #if defined(malloc) || defined(free) 316 // Gah, this is 2015, stop defining macros that break standard code already! 317 // Work around malloc/free being special macros: 318 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); } 319 static inline void WORKAROUND_free(void* ptr) { return free(ptr); } 320 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); } 321 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); } 322 #else 323 static inline void* malloc(size_t size) { return std::malloc(size); } 324 static inline void free(void* ptr) { return std::free(ptr); } 325 #endif 326 #else 327 // Debug versions when running under the Relacy race detector (ignore 328 // these in user code) 329 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); } 330 static inline void free(void* ptr) { return rl::rl_free(ptr, $); } 331 #endif 332 }; 333 334 335 // When producing or consuming many elements, the most efficient way is to: 336 // 1) Use one of the bulk-operation methods of the queue with a token 337 // 2) Failing that, use the bulk-operation methods without a token 338 // 3) Failing that, create a token and use that with the single-item methods 339 // 4) Failing that, use the single-parameter methods of the queue 340 // Having said that, don't create tokens willy-nilly -- ideally there should be 341 // a maximum of one token per thread (of each kind). 342 struct ProducerToken; 343 struct ConsumerToken; 344 345 template<typename T, typename Traits> class ConcurrentQueue; 346 template<typename T, typename Traits> class BlockingConcurrentQueue; 347 class ConcurrentQueueTests; 348 349 350 namespace details 351 { 352 struct ConcurrentQueueProducerTypelessBase 353 { 354 ConcurrentQueueProducerTypelessBase* next; 355 std::atomic<bool> inactive; 356 ProducerToken* token; 357 358 ConcurrentQueueProducerTypelessBase() 359 : next(nullptr), inactive(false), token(nullptr) 360 { 361 } 362 }; 363 364 template<bool use32> struct _hash_32_or_64 { 365 static inline std::uint32_t hash(std::uint32_t h) 366 { 367 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp 368 // Since the thread ID is already unique, all we really want to do is propagate that 369 // uniqueness evenly across all the bits, so that we can use a subset of the bits while 370 // reducing collisions significantly 371 h ^= h >> 16; 372 h *= 0x85ebca6b; 373 h ^= h >> 13; 374 h *= 0xc2b2ae35; 375 return h ^ (h >> 16); 376 } 377 }; 378 template<> struct _hash_32_or_64<1> { 379 static inline std::uint64_t hash(std::uint64_t h) 380 { 381 h ^= h >> 33; 382 h *= 0xff51afd7ed558ccd; 383 h ^= h >> 33; 384 h *= 0xc4ceb9fe1a85ec53; 385 return h ^ (h >> 33); 386 } 387 }; 388 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { }; 389 390 static inline size_t hash_thread_id(thread_id_t id) 391 { 392 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values"); 393 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash( 394 thread_id_converter<thread_id_t>::prehash(id))); 395 } 396 397 template<typename T> 398 static inline bool circular_less_than(T a, T b) 399 { 400 #ifdef _MSC_VER 401 #pragma warning(push) 402 #pragma warning(disable: 4554) 403 #endif 404 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types"); 405 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1)); 406 #ifdef _MSC_VER 407 #pragma warning(pop) 408 #endif 409 } 410 411 template<typename U> 412 static inline char* align_for(char* ptr) 413 { 414 const std::size_t alignment = std::alignment_of<U>::value; 415 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment; 416 } 417 418 template<typename T> 419 static inline T ceil_to_pow_2(T x) 420 { 421 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types"); 422 423 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 424 --x; 425 x |= x >> 1; 426 x |= x >> 2; 427 x |= x >> 4; 428 for (std::size_t i = 1; i < sizeof(T); i <<= 1) { 429 x |= x >> (i << 3); 430 } 431 ++x; 432 return x; 433 } 434 435 template<typename T> 436 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right) 437 { 438 T temp = std::move(left.load(std::memory_order_relaxed)); 439 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed); 440 right.store(std::move(temp), std::memory_order_relaxed); 441 } 442 443 template<typename T> 444 static inline T const& nomove(T const& x) 445 { 446 return x; 447 } 448 449 template<bool Enable> 450 struct nomove_if 451 { 452 template<typename T> 453 static inline T const& eval(T const& x) 454 { 455 return x; 456 } 457 }; 458 459 template<> 460 struct nomove_if<false> 461 { 462 template<typename U> 463 static inline auto eval(U&& x) 464 -> decltype(std::forward<U>(x)) 465 { 466 return std::forward<U>(x); 467 } 468 }; 469 470 template<typename It> 471 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it) 472 { 473 return *it; 474 } 475 476 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8) 477 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { }; 478 #else 479 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { }; 480 #endif 481 482 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 483 #ifdef MCDBGQ_USE_RELACY 484 typedef RelacyThreadExitListener ThreadExitListener; 485 typedef RelacyThreadExitNotifier ThreadExitNotifier; 486 #else 487 struct ThreadExitListener 488 { 489 typedef void (*callback_t)(void*); 490 callback_t callback; 491 void* userData; 492 493 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier 494 }; 495 496 497 class ThreadExitNotifier 498 { 499 public: 500 static void subscribe(ThreadExitListener* listener) 501 { 502 auto& tlsInst = instance(); 503 listener->next = tlsInst.tail; 504 tlsInst.tail = listener; 505 } 506 507 static void unsubscribe(ThreadExitListener* listener) 508 { 509 auto& tlsInst = instance(); 510 ThreadExitListener** prev = &tlsInst.tail; 511 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) { 512 if (ptr == listener) { 513 *prev = ptr->next; 514 break; 515 } 516 prev = &ptr->next; 517 } 518 } 519 520 private: 521 ThreadExitNotifier() : tail(nullptr) { } 522 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION; 523 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION; 524 525 ~ThreadExitNotifier() 526 { 527 // This thread is about to exit, let everyone know! 528 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined."); 529 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) { 530 ptr->callback(ptr->userData); 531 } 532 } 533 534 // Thread-local 535 static inline ThreadExitNotifier& instance() 536 { 537 static thread_local ThreadExitNotifier notifier; 538 return notifier; 539 } 540 541 private: 542 ThreadExitListener* tail; 543 }; 544 #endif 545 #endif 546 547 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; }; 548 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; }; 549 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; }; 550 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; }; 551 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; }; 552 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; }; 553 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { }; 554 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; }; 555 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; }; 556 } 557 558 559 struct ProducerToken 560 { 561 template<typename T, typename Traits> 562 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue); 563 564 template<typename T, typename Traits> 565 explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue); 566 567 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT 568 : producer(other.producer) 569 { 570 other.producer = nullptr; 571 if (producer != nullptr) { 572 producer->token = this; 573 } 574 } 575 576 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT 577 { 578 swap(other); 579 return *this; 580 } 581 582 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT 583 { 584 std::swap(producer, other.producer); 585 if (producer != nullptr) { 586 producer->token = this; 587 } 588 if (other.producer != nullptr) { 589 other.producer->token = &other; 590 } 591 } 592 593 // A token is always valid unless: 594 // 1) Memory allocation failed during construction 595 // 2) It was moved via the move constructor 596 // (Note: assignment does a swap, leaving both potentially valid) 597 // 3) The associated queue was destroyed 598 // Note that if valid() returns true, that only indicates 599 // that the token is valid for use with a specific queue, 600 // but not which one; that's up to the user to track. 601 inline bool valid() const { return producer != nullptr; } 602 603 ~ProducerToken() 604 { 605 if (producer != nullptr) { 606 producer->token = nullptr; 607 producer->inactive.store(true, std::memory_order_release); 608 } 609 } 610 611 // Disable copying and assignment 612 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION; 613 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION; 614 615 private: 616 template<typename T, typename Traits> friend class ConcurrentQueue; 617 friend class ConcurrentQueueTests; 618 619 protected: 620 details::ConcurrentQueueProducerTypelessBase* producer; 621 }; 622 623 624 struct ConsumerToken 625 { 626 template<typename T, typename Traits> 627 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q); 628 629 template<typename T, typename Traits> 630 explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q); 631 632 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT 633 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer) 634 { 635 } 636 637 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT 638 { 639 swap(other); 640 return *this; 641 } 642 643 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT 644 { 645 std::swap(initialOffset, other.initialOffset); 646 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset); 647 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent); 648 std::swap(currentProducer, other.currentProducer); 649 std::swap(desiredProducer, other.desiredProducer); 650 } 651 652 // Disable copying and assignment 653 ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION; 654 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION; 655 656 private: 657 template<typename T, typename Traits> friend class ConcurrentQueue; 658 friend class ConcurrentQueueTests; 659 660 private: // but shared with ConcurrentQueue 661 std::uint32_t initialOffset; 662 std::uint32_t lastKnownGlobalOffset; 663 std::uint32_t itemsConsumedFromCurrent; 664 details::ConcurrentQueueProducerTypelessBase* currentProducer; 665 details::ConcurrentQueueProducerTypelessBase* desiredProducer; 666 }; 667 668 // Need to forward-declare this swap because it's in a namespace. 669 // See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces 670 template<typename T, typename Traits> 671 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT; 672 673 674 template<typename T, typename Traits = ConcurrentQueueDefaultTraits> 675 class ConcurrentQueue 676 { 677 public: 678 typedef ::moodycamel::ProducerToken producer_token_t; 679 typedef ::moodycamel::ConsumerToken consumer_token_t; 680 681 typedef typename Traits::index_t index_t; 682 typedef typename Traits::size_t size_t; 683 684 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE); 685 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD); 686 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE); 687 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE); 688 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE); 689 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE); 690 #ifdef _MSC_VER 691 #pragma warning(push) 692 #pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!) 693 #pragma warning(disable: 4309) // static_cast: Truncation of constant value 694 #endif 695 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE); 696 #ifdef _MSC_VER 697 #pragma warning(pop) 698 #endif 699 700 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type"); 701 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type"); 702 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t"); 703 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)"); 704 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)"); 705 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); 706 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)"); 707 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2"); 708 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)"); 709 710 public: 711 // Creates a queue with at least `capacity` element slots; note that the 712 // actual number of elements that can be inserted without additional memory 713 // allocation depends on the number of producers and the block size (e.g. if 714 // the block size is equal to `capacity`, only a single block will be allocated 715 // up-front, which means only a single producer will be able to enqueue elements 716 // without an extra allocation -- blocks aren't shared between producers). 717 // This method is not thread safe -- it is up to the user to ensure that the 718 // queue is fully constructed before it starts being used by other threads (this 719 // includes making the memory effects of construction visible, possibly with a 720 // memory barrier). 721 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) 722 : producerListTail(nullptr), 723 producerCount(0), 724 initialBlockPoolIndex(0), 725 nextExplicitConsumerId(0), 726 globalExplicitConsumerOffset(0) 727 { 728 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); 729 populate_initial_implicit_producer_hash(); 730 populate_initial_block_list(capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1)); 731 732 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 733 // Track all the producers using a fully-resolved typed list for 734 // each kind; this makes it possible to debug them starting from 735 // the root queue object (otherwise wacky casts are needed that 736 // don't compile in the debugger's expression evaluator). 737 explicitProducers.store(nullptr, std::memory_order_relaxed); 738 implicitProducers.store(nullptr, std::memory_order_relaxed); 739 #endif 740 } 741 742 // Computes the correct amount of pre-allocated blocks for you based 743 // on the minimum number of elements you want available at any given 744 // time, and the maximum concurrent number of each type of producer. 745 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers) 746 : producerListTail(nullptr), 747 producerCount(0), 748 initialBlockPoolIndex(0), 749 nextExplicitConsumerId(0), 750 globalExplicitConsumerOffset(0) 751 { 752 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); 753 populate_initial_implicit_producer_hash(); 754 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers); 755 populate_initial_block_list(blocks); 756 757 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 758 explicitProducers.store(nullptr, std::memory_order_relaxed); 759 implicitProducers.store(nullptr, std::memory_order_relaxed); 760 #endif 761 } 762 763 // Note: The queue should not be accessed concurrently while it's 764 // being deleted. It's up to the user to synchronize this. 765 // This method is not thread safe. 766 ~ConcurrentQueue() 767 { 768 // Destroy producers 769 auto ptr = producerListTail.load(std::memory_order_relaxed); 770 while (ptr != nullptr) { 771 auto next = ptr->next_prod(); 772 if (ptr->token != nullptr) { 773 ptr->token->producer = nullptr; 774 } 775 destroy(ptr); 776 ptr = next; 777 } 778 779 // Destroy implicit producer hash tables 780 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) { 781 auto hash = implicitProducerHash.load(std::memory_order_relaxed); 782 while (hash != nullptr) { 783 auto prev = hash->prev; 784 if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically 785 for (size_t i = 0; i != hash->capacity; ++i) { 786 hash->entries[i].~ImplicitProducerKVP(); 787 } 788 hash->~ImplicitProducerHash(); 789 (Traits::free)(hash); 790 } 791 hash = prev; 792 } 793 } 794 795 // Destroy global free list 796 auto block = freeList.head_unsafe(); 797 while (block != nullptr) { 798 auto next = block->freeListNext.load(std::memory_order_relaxed); 799 if (block->dynamicallyAllocated) { 800 destroy(block); 801 } 802 block = next; 803 } 804 805 // Destroy initial free list 806 destroy_array(initialBlockPool, initialBlockPoolSize); 807 } 808 809 // Disable copying and copy assignment 810 ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION; 811 ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION; 812 813 // Moving is supported, but note that it is *not* a thread-safe operation. 814 // Nobody can use the queue while it's being moved, and the memory effects 815 // of that move must be propagated to other threads before they can use it. 816 // Note: When a queue is moved, its tokens are still valid but can only be 817 // used with the destination queue (i.e. semantically they are moved along 818 // with the queue itself). 819 ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT 820 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)), 821 producerCount(other.producerCount.load(std::memory_order_relaxed)), 822 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)), 823 initialBlockPool(other.initialBlockPool), 824 initialBlockPoolSize(other.initialBlockPoolSize), 825 freeList(std::move(other.freeList)), 826 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)), 827 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed)) 828 { 829 // Move the other one into this, and leave the other one as an empty queue 830 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); 831 populate_initial_implicit_producer_hash(); 832 swap_implicit_producer_hashes(other); 833 834 other.producerListTail.store(nullptr, std::memory_order_relaxed); 835 other.producerCount.store(0, std::memory_order_relaxed); 836 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed); 837 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed); 838 839 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 840 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed); 841 other.explicitProducers.store(nullptr, std::memory_order_relaxed); 842 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed); 843 other.implicitProducers.store(nullptr, std::memory_order_relaxed); 844 #endif 845 846 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed); 847 other.initialBlockPoolSize = 0; 848 other.initialBlockPool = nullptr; 849 850 reown_producers(); 851 } 852 853 inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT 854 { 855 return swap_internal(other); 856 } 857 858 // Swaps this queue's state with the other's. Not thread-safe. 859 // Swapping two queues does not invalidate their tokens, however 860 // the tokens that were created for one queue must be used with 861 // only the swapped queue (i.e. the tokens are tied to the 862 // queue's movable state, not the object itself). 863 inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT 864 { 865 swap_internal(other); 866 } 867 868 private: 869 ConcurrentQueue& swap_internal(ConcurrentQueue& other) 870 { 871 if (this == &other) { 872 return *this; 873 } 874 875 details::swap_relaxed(producerListTail, other.producerListTail); 876 details::swap_relaxed(producerCount, other.producerCount); 877 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex); 878 std::swap(initialBlockPool, other.initialBlockPool); 879 std::swap(initialBlockPoolSize, other.initialBlockPoolSize); 880 freeList.swap(other.freeList); 881 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId); 882 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset); 883 884 swap_implicit_producer_hashes(other); 885 886 reown_producers(); 887 other.reown_producers(); 888 889 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 890 details::swap_relaxed(explicitProducers, other.explicitProducers); 891 details::swap_relaxed(implicitProducers, other.implicitProducers); 892 #endif 893 894 return *this; 895 } 896 897 public: 898 // Enqueues a single item (by copying it). 899 // Allocates memory if required. Only fails if memory allocation fails (or implicit 900 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, 901 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 902 // Thread-safe. 903 inline bool enqueue(T const& item) 904 { 905 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 906 return inner_enqueue<CanAlloc>(item); 907 } 908 909 // Enqueues a single item (by moving it, if possible). 910 // Allocates memory if required. Only fails if memory allocation fails (or implicit 911 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, 912 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 913 // Thread-safe. 914 inline bool enqueue(T&& item) 915 { 916 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 917 return inner_enqueue<CanAlloc>(std::move(item)); 918 } 919 920 // Enqueues a single item (by copying it) using an explicit producer token. 921 // Allocates memory if required. Only fails if memory allocation fails (or 922 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 923 // Thread-safe. 924 inline bool enqueue(producer_token_t const& token, T const& item) 925 { 926 return inner_enqueue<CanAlloc>(token, item); 927 } 928 929 // Enqueues a single item (by moving it, if possible) using an explicit producer token. 930 // Allocates memory if required. Only fails if memory allocation fails (or 931 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 932 // Thread-safe. 933 inline bool enqueue(producer_token_t const& token, T&& item) 934 { 935 return inner_enqueue<CanAlloc>(token, std::move(item)); 936 } 937 938 // Enqueues several items. 939 // Allocates memory if required. Only fails if memory allocation fails (or 940 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 941 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 942 // Note: Use std::make_move_iterator if the elements should be moved instead of copied. 943 // Thread-safe. 944 template<typename It> 945 bool enqueue_bulk(It itemFirst, size_t count) 946 { 947 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 948 return inner_enqueue_bulk<CanAlloc>(itemFirst, count); 949 } 950 951 // Enqueues several items using an explicit producer token. 952 // Allocates memory if required. Only fails if memory allocation fails 953 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). 954 // Note: Use std::make_move_iterator if the elements should be moved 955 // instead of copied. 956 // Thread-safe. 957 template<typename It> 958 bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) 959 { 960 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count); 961 } 962 963 // Enqueues a single item (by copying it). 964 // Does not allocate memory. Fails if not enough room to enqueue (or implicit 965 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE 966 // is 0). 967 // Thread-safe. 968 inline bool try_enqueue(T const& item) 969 { 970 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 971 return inner_enqueue<CannotAlloc>(item); 972 } 973 974 // Enqueues a single item (by moving it, if possible). 975 // Does not allocate memory (except for one-time implicit producer). 976 // Fails if not enough room to enqueue (or implicit production is 977 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0). 978 // Thread-safe. 979 inline bool try_enqueue(T&& item) 980 { 981 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 982 return inner_enqueue<CannotAlloc>(std::move(item)); 983 } 984 985 // Enqueues a single item (by copying it) using an explicit producer token. 986 // Does not allocate memory. Fails if not enough room to enqueue. 987 // Thread-safe. 988 inline bool try_enqueue(producer_token_t const& token, T const& item) 989 { 990 return inner_enqueue<CannotAlloc>(token, item); 991 } 992 993 // Enqueues a single item (by moving it, if possible) using an explicit producer token. 994 // Does not allocate memory. Fails if not enough room to enqueue. 995 // Thread-safe. 996 inline bool try_enqueue(producer_token_t const& token, T&& item) 997 { 998 return inner_enqueue<CannotAlloc>(token, std::move(item)); 999 } 1000 1001 // Enqueues several items. 1002 // Does not allocate memory (except for one-time implicit producer). 1003 // Fails if not enough room to enqueue (or implicit production is 1004 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0). 1005 // Note: Use std::make_move_iterator if the elements should be moved 1006 // instead of copied. 1007 // Thread-safe. 1008 template<typename It> 1009 bool try_enqueue_bulk(It itemFirst, size_t count) 1010 { 1011 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false; 1012 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count); 1013 } 1014 1015 // Enqueues several items using an explicit producer token. 1016 // Does not allocate memory. Fails if not enough room to enqueue. 1017 // Note: Use std::make_move_iterator if the elements should be moved 1018 // instead of copied. 1019 // Thread-safe. 1020 template<typename It> 1021 bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) 1022 { 1023 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count); 1024 } 1025 1026 1027 1028 // Attempts to dequeue from the queue. 1029 // Returns false if all producer streams appeared empty at the time they 1030 // were checked (so, the queue is likely but not guaranteed to be empty). 1031 // Never allocates. Thread-safe. 1032 template<typename U> 1033 bool try_dequeue(U& item) 1034 { 1035 // Instead of simply trying each producer in turn (which could cause needless contention on the first 1036 // producer), we score them heuristically. 1037 size_t nonEmptyCount = 0; 1038 ProducerBase* best = nullptr; 1039 size_t bestSize = 0; 1040 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) { 1041 auto size = ptr->size_approx(); 1042 if (size > 0) { 1043 if (size > bestSize) { 1044 bestSize = size; 1045 best = ptr; 1046 } 1047 ++nonEmptyCount; 1048 } 1049 } 1050 1051 // If there was at least one non-empty queue but it appears empty at the time 1052 // we try to dequeue from it, we need to make sure every queue's been tried 1053 if (nonEmptyCount > 0) { 1054 if (details::likely(best->dequeue(item))) { 1055 return true; 1056 } 1057 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1058 if (ptr != best && ptr->dequeue(item)) { 1059 return true; 1060 } 1061 } 1062 } 1063 return false; 1064 } 1065 1066 // Attempts to dequeue from the queue. 1067 // Returns false if all producer streams appeared empty at the time they 1068 // were checked (so, the queue is likely but not guaranteed to be empty). 1069 // This differs from the try_dequeue(item) method in that this one does 1070 // not attempt to reduce contention by interleaving the order that producer 1071 // streams are dequeued from. So, using this method can reduce overall throughput 1072 // under contention, but will give more predictable results in single-threaded 1073 // consumer scenarios. This is mostly only useful for internal unit tests. 1074 // Never allocates. Thread-safe. 1075 template<typename U> 1076 bool try_dequeue_non_interleaved(U& item) 1077 { 1078 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1079 if (ptr->dequeue(item)) { 1080 return true; 1081 } 1082 } 1083 return false; 1084 } 1085 1086 // Attempts to dequeue from the queue using an explicit consumer token. 1087 // Returns false if all producer streams appeared empty at the time they 1088 // were checked (so, the queue is likely but not guaranteed to be empty). 1089 // Never allocates. Thread-safe. 1090 template<typename U> 1091 bool try_dequeue(consumer_token_t& token, U& item) 1092 { 1093 // The idea is roughly as follows: 1094 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less 1095 // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place 1096 // If there's no items where you're supposed to be, keep moving until you find a producer with some items 1097 // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it 1098 1099 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { 1100 if (!update_current_producer_after_rotation(token)) { 1101 return false; 1102 } 1103 } 1104 1105 // If there was at least one non-empty queue but it appears empty at the time 1106 // we try to dequeue from it, we need to make sure every queue's been tried 1107 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) { 1108 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { 1109 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); 1110 } 1111 return true; 1112 } 1113 1114 auto tail = producerListTail.load(std::memory_order_acquire); 1115 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod(); 1116 if (ptr == nullptr) { 1117 ptr = tail; 1118 } 1119 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) { 1120 if (ptr->dequeue(item)) { 1121 token.currentProducer = ptr; 1122 token.itemsConsumedFromCurrent = 1; 1123 return true; 1124 } 1125 ptr = ptr->next_prod(); 1126 if (ptr == nullptr) { 1127 ptr = tail; 1128 } 1129 } 1130 return false; 1131 } 1132 1133 // Attempts to dequeue several elements from the queue. 1134 // Returns the number of items actually dequeued. 1135 // Returns 0 if all producer streams appeared empty at the time they 1136 // were checked (so, the queue is likely but not guaranteed to be empty). 1137 // Never allocates. Thread-safe. 1138 template<typename It> 1139 size_t try_dequeue_bulk(It itemFirst, size_t max) 1140 { 1141 size_t count = 0; 1142 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1143 count += ptr->dequeue_bulk(itemFirst, max - count); 1144 if (count == max) { 1145 break; 1146 } 1147 } 1148 return count; 1149 } 1150 1151 // Attempts to dequeue several elements from the queue using an explicit consumer token. 1152 // Returns the number of items actually dequeued. 1153 // Returns 0 if all producer streams appeared empty at the time they 1154 // were checked (so, the queue is likely but not guaranteed to be empty). 1155 // Never allocates. Thread-safe. 1156 template<typename It> 1157 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) 1158 { 1159 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(std::memory_order_relaxed)) { 1160 if (!update_current_producer_after_rotation(token)) { 1161 return 0; 1162 } 1163 } 1164 1165 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max); 1166 if (count == max) { 1167 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) { 1168 globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed); 1169 } 1170 return max; 1171 } 1172 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count); 1173 max -= count; 1174 1175 auto tail = producerListTail.load(std::memory_order_acquire); 1176 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod(); 1177 if (ptr == nullptr) { 1178 ptr = tail; 1179 } 1180 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) { 1181 auto dequeued = ptr->dequeue_bulk(itemFirst, max); 1182 count += dequeued; 1183 if (dequeued != 0) { 1184 token.currentProducer = ptr; 1185 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued); 1186 } 1187 if (dequeued == max) { 1188 break; 1189 } 1190 max -= dequeued; 1191 ptr = ptr->next_prod(); 1192 if (ptr == nullptr) { 1193 ptr = tail; 1194 } 1195 } 1196 return count; 1197 } 1198 1199 1200 1201 // Attempts to dequeue from a specific producer's inner queue. 1202 // If you happen to know which producer you want to dequeue from, this 1203 // is significantly faster than using the general-case try_dequeue methods. 1204 // Returns false if the producer's queue appeared empty at the time it 1205 // was checked (so, the queue is likely but not guaranteed to be empty). 1206 // Never allocates. Thread-safe. 1207 template<typename U> 1208 inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item) 1209 { 1210 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item); 1211 } 1212 1213 // Attempts to dequeue several elements from a specific producer's inner queue. 1214 // Returns the number of items actually dequeued. 1215 // If you happen to know which producer you want to dequeue from, this 1216 // is significantly faster than using the general-case try_dequeue methods. 1217 // Returns 0 if the producer's queue appeared empty at the time it 1218 // was checked (so, the queue is likely but not guaranteed to be empty). 1219 // Never allocates. Thread-safe. 1220 template<typename It> 1221 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max) 1222 { 1223 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max); 1224 } 1225 1226 1227 // Returns an estimate of the total number of elements currently in the queue. This 1228 // estimate is only accurate if the queue has completely stabilized before it is called 1229 // (i.e. all enqueue and dequeue operations have completed and their memory effects are 1230 // visible on the calling thread, and no further operations start while this method is 1231 // being called). 1232 // Thread-safe. 1233 size_t size_approx() const 1234 { 1235 size_t size = 0; 1236 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 1237 size += ptr->size_approx(); 1238 } 1239 return size; 1240 } 1241 1242 1243 // Returns true if the underlying atomic variables used by 1244 // the queue are lock-free (they should be on most platforms). 1245 // Thread-safe. 1246 static bool is_lock_free() 1247 { 1248 return 1249 details::static_is_lock_free<bool>::value == 2 && 1250 details::static_is_lock_free<size_t>::value == 2 && 1251 details::static_is_lock_free<std::uint32_t>::value == 2 && 1252 details::static_is_lock_free<index_t>::value == 2 && 1253 details::static_is_lock_free<void*>::value == 2 && 1254 details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value == 2; 1255 } 1256 1257 1258 private: 1259 friend struct ProducerToken; 1260 friend struct ConsumerToken; 1261 friend struct ExplicitProducer; 1262 friend class ConcurrentQueueTests; 1263 1264 enum AllocationMode { CanAlloc, CannotAlloc }; 1265 1266 1267 /////////////////////////////// 1268 // Queue methods 1269 /////////////////////////////// 1270 1271 template<AllocationMode canAlloc, typename U> 1272 inline bool inner_enqueue(producer_token_t const& token, U&& element) 1273 { 1274 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element)); 1275 } 1276 1277 template<AllocationMode canAlloc, typename U> 1278 inline bool inner_enqueue(U&& element) 1279 { 1280 auto producer = get_or_add_implicit_producer(); 1281 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element)); 1282 } 1283 1284 template<AllocationMode canAlloc, typename It> 1285 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) 1286 { 1287 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count); 1288 } 1289 1290 template<AllocationMode canAlloc, typename It> 1291 inline bool inner_enqueue_bulk(It itemFirst, size_t count) 1292 { 1293 auto producer = get_or_add_implicit_producer(); 1294 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count); 1295 } 1296 1297 inline bool update_current_producer_after_rotation(consumer_token_t& token) 1298 { 1299 // Ah, there's been a rotation, figure out where we should be! 1300 auto tail = producerListTail.load(std::memory_order_acquire); 1301 if (token.desiredProducer == nullptr && tail == nullptr) { 1302 return false; 1303 } 1304 auto prodCount = producerCount.load(std::memory_order_relaxed); 1305 auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed); 1306 if (details::unlikely(token.desiredProducer == nullptr)) { 1307 // Aha, first time we're dequeueing anything. 1308 // Figure out our local position 1309 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first 1310 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount); 1311 token.desiredProducer = tail; 1312 for (std::uint32_t i = 0; i != offset; ++i) { 1313 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 1314 if (token.desiredProducer == nullptr) { 1315 token.desiredProducer = tail; 1316 } 1317 } 1318 } 1319 1320 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset; 1321 if (delta >= prodCount) { 1322 delta = delta % prodCount; 1323 } 1324 for (std::uint32_t i = 0; i != delta; ++i) { 1325 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod(); 1326 if (token.desiredProducer == nullptr) { 1327 token.desiredProducer = tail; 1328 } 1329 } 1330 1331 token.lastKnownGlobalOffset = globalOffset; 1332 token.currentProducer = token.desiredProducer; 1333 token.itemsConsumedFromCurrent = 0; 1334 return true; 1335 } 1336 1337 1338 /////////////////////////// 1339 // Free list 1340 /////////////////////////// 1341 1342 template <typename N> 1343 struct FreeListNode 1344 { 1345 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { } 1346 1347 std::atomic<std::uint32_t> freeListRefs; 1348 std::atomic<N*> freeListNext; 1349 }; 1350 1351 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but 1352 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly 1353 // speedy under low contention. 1354 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them) 1355 struct FreeList 1356 { 1357 FreeList() : freeListHead(nullptr) { } 1358 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); } 1359 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); } 1360 1361 FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION; 1362 FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION; 1363 1364 inline void add(N* node) 1365 { 1366 #if MCDBGQ_NOLOCKFREE_FREELIST 1367 debug::DebugLock lock(mutex); 1368 #endif 1369 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to 1370 // set it using a fetch_add 1371 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) { 1372 // Oh look! We were the last ones referencing this node, and we know 1373 // we want to add it to the free list, so let's do it! 1374 add_knowing_refcount_is_zero(node); 1375 } 1376 } 1377 1378 inline N* try_get() 1379 { 1380 #if MCDBGQ_NOLOCKFREE_FREELIST 1381 debug::DebugLock lock(mutex); 1382 #endif 1383 auto head = freeListHead.load(std::memory_order_acquire); 1384 while (head != nullptr) { 1385 auto prevHead = head; 1386 auto refs = head->freeListRefs.load(std::memory_order_relaxed); 1387 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) { 1388 head = freeListHead.load(std::memory_order_acquire); 1389 continue; 1390 } 1391 1392 // Good, reference count has been incremented (it wasn't at zero), which means we can read the 1393 // next and not worry about it changing between now and the time we do the CAS 1394 auto next = head->freeListNext.load(std::memory_order_relaxed); 1395 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) { 1396 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no 1397 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on). 1398 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0); 1399 1400 // Decrease refcount twice, once for our ref, and once for the list's ref 1401 head->freeListRefs.fetch_add(-2, std::memory_order_release); 1402 return head; 1403 } 1404 1405 // OK, the head must have changed on us, but we still need to decrease the refcount we increased. 1406 // Note that we don't need to release any memory effects, but we do need to ensure that the reference 1407 // count decrement happens-after the CAS on the head. 1408 refs = prevHead->freeListRefs.fetch_add(-1, std::memory_order_acq_rel); 1409 if (refs == SHOULD_BE_ON_FREELIST + 1) { 1410 add_knowing_refcount_is_zero(prevHead); 1411 } 1412 } 1413 1414 return nullptr; 1415 } 1416 1417 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes) 1418 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); } 1419 1420 private: 1421 inline void add_knowing_refcount_is_zero(N* node) 1422 { 1423 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run 1424 // only one copy of this method per node at a time, i.e. the single thread case), then we know 1425 // we can safely change the next pointer of the node; however, once the refcount is back above 1426 // zero, then other threads could increase it (happens under heavy contention, when the refcount 1427 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to 1428 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS 1429 // to add the node to the actual list fails, decrease the refcount and leave the add operation to 1430 // the next thread who puts the refcount back at zero (which could be us, hence the loop). 1431 auto head = freeListHead.load(std::memory_order_relaxed); 1432 while (true) { 1433 node->freeListNext.store(head, std::memory_order_relaxed); 1434 node->freeListRefs.store(1, std::memory_order_release); 1435 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) { 1436 // Hmm, the add failed, but we can only try again when the refcount goes back to zero 1437 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) { 1438 continue; 1439 } 1440 } 1441 return; 1442 } 1443 } 1444 1445 private: 1446 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention) 1447 std::atomic<N*> freeListHead; 1448 1449 static const std::uint32_t REFS_MASK = 0x7FFFFFFF; 1450 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000; 1451 1452 #if MCDBGQ_NOLOCKFREE_FREELIST 1453 debug::DebugMutex mutex; 1454 #endif 1455 }; 1456 1457 1458 /////////////////////////// 1459 // Block 1460 /////////////////////////// 1461 1462 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 }; 1463 1464 struct Block 1465 { 1466 Block() 1467 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true) 1468 { 1469 #if MCDBGQ_TRACKMEM 1470 owner = nullptr; 1471 #endif 1472 } 1473 1474 template<InnerQueueContext context> 1475 inline bool is_empty() const 1476 { 1477 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 1478 // Check flags 1479 for (size_t i = 0; i < BLOCK_SIZE; ++i) { 1480 if (!emptyFlags[i].load(std::memory_order_relaxed)) { 1481 return false; 1482 } 1483 } 1484 1485 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set 1486 std::atomic_thread_fence(std::memory_order_acquire); 1487 return true; 1488 } 1489 else { 1490 // Check counter 1491 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) { 1492 std::atomic_thread_fence(std::memory_order_acquire); 1493 return true; 1494 } 1495 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE); 1496 return false; 1497 } 1498 } 1499 1500 // Returns true if the block is now empty (does not apply in explicit context) 1501 template<InnerQueueContext context> 1502 inline bool set_empty(index_t i) 1503 { 1504 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 1505 // Set flag 1506 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed)); 1507 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release); 1508 return false; 1509 } 1510 else { 1511 // Increment counter 1512 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release); 1513 assert(prevVal < BLOCK_SIZE); 1514 return prevVal == BLOCK_SIZE - 1; 1515 } 1516 } 1517 1518 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0). 1519 // Returns true if the block is now empty (does not apply in explicit context). 1520 template<InnerQueueContext context> 1521 inline bool set_many_empty(index_t i, size_t count) 1522 { 1523 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 1524 // Set flags 1525 std::atomic_thread_fence(std::memory_order_release); 1526 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1; 1527 for (size_t j = 0; j != count; ++j) { 1528 assert(!emptyFlags[i + j].load(std::memory_order_relaxed)); 1529 emptyFlags[i + j].store(true, std::memory_order_relaxed); 1530 } 1531 return false; 1532 } 1533 else { 1534 // Increment counter 1535 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release); 1536 assert(prevVal + count <= BLOCK_SIZE); 1537 return prevVal + count == BLOCK_SIZE; 1538 } 1539 } 1540 1541 template<InnerQueueContext context> 1542 inline void set_all_empty() 1543 { 1544 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 1545 // Set all flags 1546 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 1547 emptyFlags[i].store(true, std::memory_order_relaxed); 1548 } 1549 } 1550 else { 1551 // Reset counter 1552 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed); 1553 } 1554 } 1555 1556 template<InnerQueueContext context> 1557 inline void reset_empty() 1558 { 1559 if (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) { 1560 // Reset flags 1561 for (size_t i = 0; i != BLOCK_SIZE; ++i) { 1562 emptyFlags[i].store(false, std::memory_order_relaxed); 1563 } 1564 } 1565 else { 1566 // Reset counter 1567 elementsCompletelyDequeued.store(0, std::memory_order_relaxed); 1568 } 1569 } 1570 1571 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 1572 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); } 1573 1574 private: 1575 // IMPORTANT: This must be the first member in Block, so that if T depends on the alignment of 1576 // addresses returned by malloc, that alignment will be preserved. Apparently clang actually 1577 // generates code that uses this assumption for AVX instructions in some cases. Ideally, we 1578 // should also align Block to the alignment of T in case it's higher than malloc's 16-byte 1579 // alignment, but this is hard to do in a cross-platform way. Assert for this case: 1580 static_assert(std::alignment_of<T>::value <= std::alignment_of<details::max_align_t>::value, "The queue does not support super-aligned types at this time"); 1581 // Additionally, we need the alignment of Block itself to be a multiple of max_align_t since 1582 // otherwise the appropriate padding will not be added at the end of Block in order to make 1583 // arrays of Blocks all be properly aligned (not just the first one). We use a union to force 1584 // this. 1585 union { 1586 char elements[sizeof(T) * BLOCK_SIZE]; 1587 details::max_align_t dummy; 1588 }; 1589 public: 1590 Block* next; 1591 std::atomic<size_t> elementsCompletelyDequeued; 1592 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1]; 1593 public: 1594 std::atomic<std::uint32_t> freeListRefs; 1595 std::atomic<Block*> freeListNext; 1596 std::atomic<bool> shouldBeOnFreeList; 1597 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool' 1598 1599 #if MCDBGQ_TRACKMEM 1600 void* owner; 1601 #endif 1602 }; 1603 static_assert(std::alignment_of<Block>::value >= std::alignment_of<details::max_align_t>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping"); 1604 1605 1606 #if MCDBGQ_TRACKMEM 1607 public: 1608 struct MemStats; 1609 private: 1610 #endif 1611 1612 /////////////////////////// 1613 // Producer base 1614 /////////////////////////// 1615 1616 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase 1617 { 1618 ProducerBase(ConcurrentQueue* parent, bool isExplicit) : 1619 tailIndex(0), 1620 headIndex(0), 1621 dequeueOptimisticCount(0), 1622 dequeueOvercommit(0), 1623 tailBlock(nullptr), 1624 isExplicit(isExplicit), 1625 parent(parent) 1626 { 1627 } 1628 1629 virtual ~ProducerBase() { }; 1630 1631 template<typename U> 1632 inline bool dequeue(U& element) 1633 { 1634 if (isExplicit) { 1635 return static_cast<ExplicitProducer*>(this)->dequeue(element); 1636 } 1637 else { 1638 return static_cast<ImplicitProducer*>(this)->dequeue(element); 1639 } 1640 } 1641 1642 template<typename It> 1643 inline size_t dequeue_bulk(It& itemFirst, size_t max) 1644 { 1645 if (isExplicit) { 1646 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max); 1647 } 1648 else { 1649 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max); 1650 } 1651 } 1652 1653 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); } 1654 1655 inline size_t size_approx() const 1656 { 1657 auto tail = tailIndex.load(std::memory_order_relaxed); 1658 auto head = headIndex.load(std::memory_order_relaxed); 1659 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0; 1660 } 1661 1662 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); } 1663 protected: 1664 std::atomic<index_t> tailIndex; // Where to enqueue to next 1665 std::atomic<index_t> headIndex; // Where to dequeue from next 1666 1667 std::atomic<index_t> dequeueOptimisticCount; 1668 std::atomic<index_t> dequeueOvercommit; 1669 1670 Block* tailBlock; 1671 1672 public: 1673 bool isExplicit; 1674 ConcurrentQueue* parent; 1675 1676 protected: 1677 #if MCDBGQ_TRACKMEM 1678 friend struct MemStats; 1679 #endif 1680 }; 1681 1682 1683 /////////////////////////// 1684 // Explicit queue 1685 /////////////////////////// 1686 1687 struct ExplicitProducer : public ProducerBase 1688 { 1689 explicit ExplicitProducer(ConcurrentQueue* parent) : 1690 ProducerBase(parent, true), 1691 blockIndex(nullptr), 1692 pr_blockIndexSlotsUsed(0), 1693 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1), 1694 pr_blockIndexFront(0), 1695 pr_blockIndexEntries(nullptr), 1696 pr_blockIndexRaw(nullptr) 1697 { 1698 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1; 1699 if (poolBasedIndexSize > pr_blockIndexSize) { 1700 pr_blockIndexSize = poolBasedIndexSize; 1701 } 1702 1703 new_block_index(0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE 1704 } 1705 1706 ~ExplicitProducer() 1707 { 1708 // Destruct any elements not yet dequeued. 1709 // Since we're in the destructor, we can assume all elements 1710 // are either completely dequeued or completely not (no halfways). 1711 if (this->tailBlock != nullptr) { // Note this means there must be a block index too 1712 // First find the block that's partially dequeued, if any 1713 Block* halfDequeuedBlock = nullptr; 1714 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) { 1715 // The head's not on a block boundary, meaning a block somewhere is partially dequeued 1716 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary) 1717 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1); 1718 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) { 1719 i = (i + 1) & (pr_blockIndexSize - 1); 1720 } 1721 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed))); 1722 halfDequeuedBlock = pr_blockIndexEntries[i].block; 1723 } 1724 1725 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration) 1726 auto block = this->tailBlock; 1727 do { 1728 block = block->next; 1729 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) { 1730 continue; 1731 } 1732 1733 size_t i = 0; // Offset into block 1734 if (block == halfDequeuedBlock) { 1735 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 1736 } 1737 1738 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index 1739 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)); 1740 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) { 1741 (*block)[i++]->~T(); 1742 } 1743 } while (block != this->tailBlock); 1744 } 1745 1746 // Destroy all blocks that we own 1747 if (this->tailBlock != nullptr) { 1748 auto block = this->tailBlock; 1749 do { 1750 auto nextBlock = block->next; 1751 if (block->dynamicallyAllocated) { 1752 destroy(block); 1753 } 1754 else { 1755 this->parent->add_block_to_free_list(block); 1756 } 1757 block = nextBlock; 1758 } while (block != this->tailBlock); 1759 } 1760 1761 // Destroy the block indices 1762 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw); 1763 while (header != nullptr) { 1764 auto prev = static_cast<BlockIndexHeader*>(header->prev); 1765 header->~BlockIndexHeader(); 1766 (Traits::free)(header); 1767 header = prev; 1768 } 1769 } 1770 1771 template<AllocationMode allocMode, typename U> 1772 inline bool enqueue(U&& element) 1773 { 1774 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); 1775 index_t newTailIndex = 1 + currentTailIndex; 1776 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) { 1777 // We reached the end of a block, start a new one 1778 auto startBlock = this->tailBlock; 1779 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; 1780 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) { 1781 // We can re-use the block ahead of us, it's empty! 1782 this->tailBlock = this->tailBlock->next; 1783 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>(); 1784 1785 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the 1786 // last block from it first -- except instead of removing then adding, we can just overwrite). 1787 // Note that there must be a valid block index here, since even if allocation failed in the ctor, 1788 // it would have been re-attempted when adding the first block to the queue; since there is such 1789 // a block, a block index must have been successfully allocated. 1790 } 1791 else { 1792 // Whatever head value we see here is >= the last value we saw here (relatively), 1793 // and <= its current value. Since we have the most recent tail, the head must be 1794 // <= to it. 1795 auto head = this->headIndex.load(std::memory_order_relaxed); 1796 assert(!details::circular_less_than<index_t>(currentTailIndex, head)); 1797 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) 1798 || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) { 1799 // We can't enqueue in another block because there's not enough leeway -- the 1800 // tail could surpass the head by the time the block fills up! (Or we'll exceed 1801 // the size limit, if the second part of the condition was true.) 1802 return false; 1803 } 1804 // We're going to need a new block; check that the block index has room 1805 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) { 1806 // Hmm, the circular block index is already full -- we'll need 1807 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if 1808 // the initial allocation failed in the constructor. 1809 1810 if (allocMode == CannotAlloc || !new_block_index(pr_blockIndexSlotsUsed)) { 1811 return false; 1812 } 1813 } 1814 1815 // Insert a new block in the circular linked list 1816 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>(); 1817 if (newBlock == nullptr) { 1818 return false; 1819 } 1820 #if MCDBGQ_TRACKMEM 1821 newBlock->owner = this; 1822 #endif 1823 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>(); 1824 if (this->tailBlock == nullptr) { 1825 newBlock->next = newBlock; 1826 } 1827 else { 1828 newBlock->next = this->tailBlock->next; 1829 this->tailBlock->next = newBlock; 1830 } 1831 this->tailBlock = newBlock; 1832 ++pr_blockIndexSlotsUsed; 1833 } 1834 1835 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) { 1836 // The constructor may throw. We want the element not to appear in the queue in 1837 // that case (without corrupting the queue): 1838 MOODYCAMEL_TRY { 1839 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element)); 1840 } 1841 MOODYCAMEL_CATCH (...) { 1842 // Revert change to the current block, but leave the new block available 1843 // for next time 1844 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; 1845 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock; 1846 MOODYCAMEL_RETHROW; 1847 } 1848 } 1849 else { 1850 (void)startBlock; 1851 (void)originalBlockIndexSlotsUsed; 1852 } 1853 1854 // Add block to block index 1855 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; 1856 entry.base = currentTailIndex; 1857 entry.block = this->tailBlock; 1858 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release); 1859 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); 1860 1861 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) { 1862 this->tailIndex.store(newTailIndex, std::memory_order_release); 1863 return true; 1864 } 1865 } 1866 1867 // Enqueue 1868 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element)); 1869 1870 this->tailIndex.store(newTailIndex, std::memory_order_release); 1871 return true; 1872 } 1873 1874 template<typename U> 1875 bool dequeue(U& element) 1876 { 1877 auto tail = this->tailIndex.load(std::memory_order_relaxed); 1878 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 1879 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) { 1880 // Might be something to dequeue, let's give it a try 1881 1882 // Note that this if is purely for performance purposes in the common case when the queue is 1883 // empty and the values are eventually consistent -- we may enter here spuriously. 1884 1885 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we 1886 // change them) and must be the same value at this point (inside the if) as when the if condition was 1887 // evaluated. 1888 1889 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below. 1890 // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in 1891 // the fetch_add below will result in a value at least as recent as that (and therefore at least as large). 1892 // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all 1893 // read-modify-write operations are guaranteed to work on the latest value in the modification order), but 1894 // unfortunately that can't be shown to be correct using only the C++11 standard. 1895 // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case 1896 std::atomic_thread_fence(std::memory_order_acquire); 1897 1898 // Increment optimistic counter, then check if it went over the boundary 1899 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed); 1900 1901 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever 1902 // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now 1903 // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon 1904 // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount. 1905 assert(overcommit <= myDequeueCount); 1906 1907 // Note that we reload tail here in case it changed; it will be the same value as before or greater, since 1908 // this load is sequenced after (happens after) the earlier load above. This is supported by read-read 1909 // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order 1910 tail = this->tailIndex.load(std::memory_order_acquire); 1911 if (details::likely(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) { 1912 // Guaranteed to be at least one element to dequeue! 1913 1914 // Get the index. Note that since there's guaranteed to be at least one element, this 1915 // will never exceed tail. We need to do an acquire-release fence here since it's possible 1916 // that whatever condition got us to this point was for an earlier enqueued element (that 1917 // we already see the memory effects for), but that by the time we increment somebody else 1918 // has incremented it, and we need to see the memory effects for *that* element, which is 1919 // in such a case is necessarily visible on the thread that incremented it in the first 1920 // place with the more current condition (they must have acquired a tail that is at least 1921 // as recent). 1922 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel); 1923 1924 1925 // Determine which block the element is in 1926 1927 auto localBlockIndex = blockIndex.load(std::memory_order_acquire); 1928 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); 1929 1930 // We need to be careful here about subtracting and dividing because of index wrap-around. 1931 // When an index wraps, we need to preserve the sign of the offset when dividing it by the 1932 // block size (in order to get a correct signed block count offset in all cases): 1933 auto headBase = localBlockIndex->entries[localBlockIndexHead].base; 1934 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1); 1935 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE); 1936 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block; 1937 1938 // Dequeue 1939 auto& el = *((*block)[index]); 1940 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) { 1941 // Make sure the element is still fully dequeued and destroyed even if the assignment 1942 // throws 1943 struct Guard { 1944 Block* block; 1945 index_t index; 1946 1947 ~Guard() 1948 { 1949 (*block)[index]->~T(); 1950 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index); 1951 } 1952 } guard = { block, index }; 1953 1954 element = std::move(el); 1955 } 1956 else { 1957 element = std::move(el); 1958 el.~T(); 1959 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index); 1960 } 1961 1962 return true; 1963 } 1964 else { 1965 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent 1966 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write 1967 } 1968 } 1969 1970 return false; 1971 } 1972 1973 template<AllocationMode allocMode, typename It> 1974 bool enqueue_bulk(It itemFirst, size_t count) 1975 { 1976 // First, we need to make sure we have enough room to enqueue all of the elements; 1977 // this means pre-allocating blocks and putting them in the block index (but only if 1978 // all the allocations succeeded). 1979 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed); 1980 auto startBlock = this->tailBlock; 1981 auto originalBlockIndexFront = pr_blockIndexFront; 1982 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed; 1983 1984 Block* firstAllocatedBlock = nullptr; 1985 1986 // Figure out how many blocks we'll need to allocate, and do so 1987 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)); 1988 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1); 1989 if (blockBaseDiff > 0) { 1990 // Allocate as many blocks as possible from ahead 1991 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) { 1992 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE); 1993 currentTailIndex += static_cast<index_t>(BLOCK_SIZE); 1994 1995 this->tailBlock = this->tailBlock->next; 1996 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock; 1997 1998 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; 1999 entry.base = currentTailIndex; 2000 entry.block = this->tailBlock; 2001 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); 2002 } 2003 2004 // Now allocate as many blocks as necessary from the block pool 2005 while (blockBaseDiff > 0) { 2006 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE); 2007 currentTailIndex += static_cast<index_t>(BLOCK_SIZE); 2008 2009 auto head = this->headIndex.load(std::memory_order_relaxed); 2010 assert(!details::circular_less_than<index_t>(currentTailIndex, head)); 2011 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head)); 2012 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) { 2013 if (allocMode == CannotAlloc || full || !new_block_index(originalBlockIndexSlotsUsed)) { 2014 // Failed to allocate, undo changes (but keep injected blocks) 2015 pr_blockIndexFront = originalBlockIndexFront; 2016 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; 2017 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock; 2018 return false; 2019 } 2020 2021 // pr_blockIndexFront is updated inside new_block_index, so we need to 2022 // update our fallback value too (since we keep the new index even if we 2023 // later fail) 2024 originalBlockIndexFront = originalBlockIndexSlotsUsed; 2025 } 2026 2027 // Insert a new block in the circular linked list 2028 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>(); 2029 if (newBlock == nullptr) { 2030 pr_blockIndexFront = originalBlockIndexFront; 2031 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; 2032 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock; 2033 return false; 2034 } 2035 2036 #if MCDBGQ_TRACKMEM 2037 newBlock->owner = this; 2038 #endif 2039 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>(); 2040 if (this->tailBlock == nullptr) { 2041 newBlock->next = newBlock; 2042 } 2043 else { 2044 newBlock->next = this->tailBlock->next; 2045 this->tailBlock->next = newBlock; 2046 } 2047 this->tailBlock = newBlock; 2048 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock; 2049 2050 ++pr_blockIndexSlotsUsed; 2051 2052 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront]; 2053 entry.base = currentTailIndex; 2054 entry.block = this->tailBlock; 2055 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1); 2056 } 2057 2058 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and 2059 // publish the new block index front 2060 auto block = firstAllocatedBlock; 2061 while (true) { 2062 block->ConcurrentQueue::Block::template reset_empty<explicit_context>(); 2063 if (block == this->tailBlock) { 2064 break; 2065 } 2066 block = block->next; 2067 } 2068 2069 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) { 2070 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release); 2071 } 2072 } 2073 2074 // Enqueue, one block at a time 2075 index_t newTailIndex = startTailIndex + static_cast<index_t>(count); 2076 currentTailIndex = startTailIndex; 2077 auto endBlock = this->tailBlock; 2078 this->tailBlock = startBlock; 2079 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); 2080 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { 2081 this->tailBlock = firstAllocatedBlock; 2082 } 2083 while (true) { 2084 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2085 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) { 2086 stopIndex = newTailIndex; 2087 } 2088 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) { 2089 while (currentTailIndex != stopIndex) { 2090 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++); 2091 } 2092 } 2093 else { 2094 MOODYCAMEL_TRY { 2095 while (currentTailIndex != stopIndex) { 2096 // Must use copy constructor even if move constructor is available 2097 // because we may have to revert if there's an exception. 2098 // Sorry about the horrible templated next line, but it was the only way 2099 // to disable moving *at compile time*, which is important because a type 2100 // may only define a (noexcept) move constructor, and so calls to the 2101 // cctor will not compile, even if they are in an if branch that will never 2102 // be executed 2103 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst)); 2104 ++currentTailIndex; 2105 ++itemFirst; 2106 } 2107 } 2108 MOODYCAMEL_CATCH (...) { 2109 // Oh dear, an exception's been thrown -- destroy the elements that 2110 // were enqueued so far and revert the entire bulk operation (we'll keep 2111 // any allocated blocks in our linked list for later, though). 2112 auto constructedStopIndex = currentTailIndex; 2113 auto lastBlockEnqueued = this->tailBlock; 2114 2115 pr_blockIndexFront = originalBlockIndexFront; 2116 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed; 2117 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock; 2118 2119 if (!details::is_trivially_destructible<T>::value) { 2120 auto block = startBlock; 2121 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) { 2122 block = firstAllocatedBlock; 2123 } 2124 currentTailIndex = startTailIndex; 2125 while (true) { 2126 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2127 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) { 2128 stopIndex = constructedStopIndex; 2129 } 2130 while (currentTailIndex != stopIndex) { 2131 (*block)[currentTailIndex++]->~T(); 2132 } 2133 if (block == lastBlockEnqueued) { 2134 break; 2135 } 2136 block = block->next; 2137 } 2138 } 2139 MOODYCAMEL_RETHROW; 2140 } 2141 } 2142 2143 if (this->tailBlock == endBlock) { 2144 assert(currentTailIndex == newTailIndex); 2145 break; 2146 } 2147 this->tailBlock = this->tailBlock->next; 2148 } 2149 2150 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) { 2151 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release); 2152 } 2153 2154 this->tailIndex.store(newTailIndex, std::memory_order_release); 2155 return true; 2156 } 2157 2158 template<typename It> 2159 size_t dequeue_bulk(It& itemFirst, size_t max) 2160 { 2161 auto tail = this->tailIndex.load(std::memory_order_relaxed); 2162 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 2163 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); 2164 if (details::circular_less_than<size_t>(0, desiredCount)) { 2165 desiredCount = desiredCount < max ? desiredCount : max; 2166 std::atomic_thread_fence(std::memory_order_acquire); 2167 2168 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); 2169 assert(overcommit <= myDequeueCount); 2170 2171 tail = this->tailIndex.load(std::memory_order_acquire); 2172 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit)); 2173 if (details::circular_less_than<size_t>(0, actualCount)) { 2174 actualCount = desiredCount < actualCount ? desiredCount : actualCount; 2175 if (actualCount < desiredCount) { 2176 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); 2177 } 2178 2179 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this 2180 // will never exceed tail. 2181 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); 2182 2183 // Determine which block the first element is in 2184 auto localBlockIndex = blockIndex.load(std::memory_order_acquire); 2185 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire); 2186 2187 auto headBase = localBlockIndex->entries[localBlockIndexHead].base; 2188 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1); 2189 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE); 2190 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1); 2191 2192 // Iterate the blocks and dequeue 2193 auto index = firstIndex; 2194 do { 2195 auto firstIndexInBlock = index; 2196 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2197 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 2198 auto block = localBlockIndex->entries[indexIndex].block; 2199 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) { 2200 while (index != endIndex) { 2201 auto& el = *((*block)[index]); 2202 *itemFirst++ = std::move(el); 2203 el.~T(); 2204 ++index; 2205 } 2206 } 2207 else { 2208 MOODYCAMEL_TRY { 2209 while (index != endIndex) { 2210 auto& el = *((*block)[index]); 2211 *itemFirst = std::move(el); 2212 ++itemFirst; 2213 el.~T(); 2214 ++index; 2215 } 2216 } 2217 MOODYCAMEL_CATCH (...) { 2218 // It's too late to revert the dequeue, but we can make sure that all 2219 // the dequeued objects are properly destroyed and the block index 2220 // (and empty count) are properly updated before we propagate the exception 2221 do { 2222 block = localBlockIndex->entries[indexIndex].block; 2223 while (index != endIndex) { 2224 (*block)[index++]->~T(); 2225 } 2226 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock)); 2227 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); 2228 2229 firstIndexInBlock = index; 2230 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2231 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 2232 } while (index != firstIndex + actualCount); 2233 2234 MOODYCAMEL_RETHROW; 2235 } 2236 } 2237 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock)); 2238 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1); 2239 } while (index != firstIndex + actualCount); 2240 2241 return actualCount; 2242 } 2243 else { 2244 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent 2245 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); 2246 } 2247 } 2248 2249 return 0; 2250 } 2251 2252 private: 2253 struct BlockIndexEntry 2254 { 2255 index_t base; 2256 Block* block; 2257 }; 2258 2259 struct BlockIndexHeader 2260 { 2261 size_t size; 2262 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront) 2263 BlockIndexEntry* entries; 2264 void* prev; 2265 }; 2266 2267 2268 bool new_block_index(size_t numberOfFilledSlotsToExpose) 2269 { 2270 auto prevBlockSizeMask = pr_blockIndexSize - 1; 2271 2272 // Create the new block 2273 pr_blockIndexSize <<= 1; 2274 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize)); 2275 if (newRawPtr == nullptr) { 2276 pr_blockIndexSize >>= 1; // Reset to allow graceful retry 2277 return false; 2278 } 2279 2280 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader))); 2281 2282 // Copy in all the old indices, if any 2283 size_t j = 0; 2284 if (pr_blockIndexSlotsUsed != 0) { 2285 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask; 2286 do { 2287 newBlockIndexEntries[j++] = pr_blockIndexEntries[i]; 2288 i = (i + 1) & prevBlockSizeMask; 2289 } while (i != pr_blockIndexFront); 2290 } 2291 2292 // Update everything 2293 auto header = new (newRawPtr) BlockIndexHeader; 2294 header->size = pr_blockIndexSize; 2295 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed); 2296 header->entries = newBlockIndexEntries; 2297 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later 2298 2299 pr_blockIndexFront = j; 2300 pr_blockIndexEntries = newBlockIndexEntries; 2301 pr_blockIndexRaw = newRawPtr; 2302 blockIndex.store(header, std::memory_order_release); 2303 2304 return true; 2305 } 2306 2307 private: 2308 std::atomic<BlockIndexHeader*> blockIndex; 2309 2310 // To be used by producer only -- consumer must use the ones in referenced by blockIndex 2311 size_t pr_blockIndexSlotsUsed; 2312 size_t pr_blockIndexSize; 2313 size_t pr_blockIndexFront; // Next slot (not current) 2314 BlockIndexEntry* pr_blockIndexEntries; 2315 void* pr_blockIndexRaw; 2316 2317 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 2318 public: 2319 ExplicitProducer* nextExplicitProducer; 2320 private: 2321 #endif 2322 2323 #if MCDBGQ_TRACKMEM 2324 friend struct MemStats; 2325 #endif 2326 }; 2327 2328 2329 ////////////////////////////////// 2330 // Implicit queue 2331 ////////////////////////////////// 2332 2333 struct ImplicitProducer : public ProducerBase 2334 { 2335 ImplicitProducer(ConcurrentQueue* parent) : 2336 ProducerBase(parent, false), 2337 nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE), 2338 blockIndex(nullptr) 2339 { 2340 new_block_index(); 2341 } 2342 2343 ~ImplicitProducer() 2344 { 2345 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations 2346 // completed already; this means that all undequeued elements are placed contiguously across 2347 // contiguous blocks, and that only the first and last remaining blocks can be only partially 2348 // empty (all other remaining blocks must be completely full). 2349 2350 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 2351 // Unregister ourselves for thread termination notification 2352 if (!this->inactive.load(std::memory_order_relaxed)) { 2353 details::ThreadExitNotifier::unsubscribe(&threadExitListener); 2354 } 2355 #endif 2356 2357 // Destroy all remaining elements! 2358 auto tail = this->tailIndex.load(std::memory_order_relaxed); 2359 auto index = this->headIndex.load(std::memory_order_relaxed); 2360 Block* block = nullptr; 2361 assert(index == tail || details::circular_less_than(index, tail)); 2362 bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed 2363 while (index != tail) { 2364 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) { 2365 if (block != nullptr) { 2366 // Free the old block 2367 this->parent->add_block_to_free_list(block); 2368 } 2369 2370 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed); 2371 } 2372 2373 ((*block)[index])->~T(); 2374 ++index; 2375 } 2376 // Even if the queue is empty, there's still one block that's not on the free list 2377 // (unless the head index reached the end of it, in which case the tail will be poised 2378 // to create a new block). 2379 if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) { 2380 this->parent->add_block_to_free_list(this->tailBlock); 2381 } 2382 2383 // Destroy block index 2384 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); 2385 if (localBlockIndex != nullptr) { 2386 for (size_t i = 0; i != localBlockIndex->capacity; ++i) { 2387 localBlockIndex->index[i]->~BlockIndexEntry(); 2388 } 2389 do { 2390 auto prev = localBlockIndex->prev; 2391 localBlockIndex->~BlockIndexHeader(); 2392 (Traits::free)(localBlockIndex); 2393 localBlockIndex = prev; 2394 } while (localBlockIndex != nullptr); 2395 } 2396 } 2397 2398 template<AllocationMode allocMode, typename U> 2399 inline bool enqueue(U&& element) 2400 { 2401 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed); 2402 index_t newTailIndex = 1 + currentTailIndex; 2403 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) { 2404 // We reached the end of a block, start a new one 2405 auto head = this->headIndex.load(std::memory_order_relaxed); 2406 assert(!details::circular_less_than<index_t>(currentTailIndex, head)); 2407 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) { 2408 return false; 2409 } 2410 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2411 debug::DebugLock lock(mutex); 2412 #endif 2413 // Find out where we'll be inserting this block in the block index 2414 BlockIndexEntry* idxEntry; 2415 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) { 2416 return false; 2417 } 2418 2419 // Get ahold of a new block 2420 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>(); 2421 if (newBlock == nullptr) { 2422 rewind_block_index_tail(); 2423 idxEntry->value.store(nullptr, std::memory_order_relaxed); 2424 return false; 2425 } 2426 #if MCDBGQ_TRACKMEM 2427 newBlock->owner = this; 2428 #endif 2429 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>(); 2430 2431 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) { 2432 // May throw, try to insert now before we publish the fact that we have this new block 2433 MOODYCAMEL_TRY { 2434 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element)); 2435 } 2436 MOODYCAMEL_CATCH (...) { 2437 rewind_block_index_tail(); 2438 idxEntry->value.store(nullptr, std::memory_order_relaxed); 2439 this->parent->add_block_to_free_list(newBlock); 2440 MOODYCAMEL_RETHROW; 2441 } 2442 } 2443 2444 // Insert the new block into the index 2445 idxEntry->value.store(newBlock, std::memory_order_relaxed); 2446 2447 this->tailBlock = newBlock; 2448 2449 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new (nullptr) T(std::forward<U>(element)))) { 2450 this->tailIndex.store(newTailIndex, std::memory_order_release); 2451 return true; 2452 } 2453 } 2454 2455 // Enqueue 2456 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element)); 2457 2458 this->tailIndex.store(newTailIndex, std::memory_order_release); 2459 return true; 2460 } 2461 2462 template<typename U> 2463 bool dequeue(U& element) 2464 { 2465 // See ExplicitProducer::dequeue for rationale and explanation 2466 index_t tail = this->tailIndex.load(std::memory_order_relaxed); 2467 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 2468 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) { 2469 std::atomic_thread_fence(std::memory_order_acquire); 2470 2471 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed); 2472 assert(overcommit <= myDequeueCount); 2473 tail = this->tailIndex.load(std::memory_order_acquire); 2474 if (details::likely(details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) { 2475 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel); 2476 2477 // Determine which block the element is in 2478 auto entry = get_block_index_entry_for_index(index); 2479 2480 // Dequeue 2481 auto block = entry->value.load(std::memory_order_relaxed); 2482 auto& el = *((*block)[index]); 2483 2484 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) { 2485 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2486 // Note: Acquiring the mutex with every dequeue instead of only when a block 2487 // is released is very sub-optimal, but it is, after all, purely debug code. 2488 debug::DebugLock lock(producer->mutex); 2489 #endif 2490 struct Guard { 2491 Block* block; 2492 index_t index; 2493 BlockIndexEntry* entry; 2494 ConcurrentQueue* parent; 2495 2496 ~Guard() 2497 { 2498 (*block)[index]->~T(); 2499 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) { 2500 entry->value.store(nullptr, std::memory_order_relaxed); 2501 parent->add_block_to_free_list(block); 2502 } 2503 } 2504 } guard = { block, index, entry, this->parent }; 2505 2506 element = std::move(el); 2507 } 2508 else { 2509 element = std::move(el); 2510 el.~T(); 2511 2512 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) { 2513 { 2514 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2515 debug::DebugLock lock(mutex); 2516 #endif 2517 // Add the block back into the global free pool (and remove from block index) 2518 entry->value.store(nullptr, std::memory_order_relaxed); 2519 } 2520 this->parent->add_block_to_free_list(block); // releases the above store 2521 } 2522 } 2523 2524 return true; 2525 } 2526 else { 2527 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); 2528 } 2529 } 2530 2531 return false; 2532 } 2533 2534 template<AllocationMode allocMode, typename It> 2535 bool enqueue_bulk(It itemFirst, size_t count) 2536 { 2537 // First, we need to make sure we have enough room to enqueue all of the elements; 2538 // this means pre-allocating blocks and putting them in the block index (but only if 2539 // all the allocations succeeded). 2540 2541 // Note that the tailBlock we start off with may not be owned by us any more; 2542 // this happens if it was filled up exactly to the top (setting tailIndex to 2543 // the first index of the next block which is not yet allocated), then dequeued 2544 // completely (putting it on the free list) before we enqueue again. 2545 2546 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed); 2547 auto startBlock = this->tailBlock; 2548 Block* firstAllocatedBlock = nullptr; 2549 auto endBlock = this->tailBlock; 2550 2551 // Figure out how many blocks we'll need to allocate, and do so 2552 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)); 2553 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1); 2554 if (blockBaseDiff > 0) { 2555 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2556 debug::DebugLock lock(mutex); 2557 #endif 2558 do { 2559 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE); 2560 currentTailIndex += static_cast<index_t>(BLOCK_SIZE); 2561 2562 // Find out where we'll be inserting this block in the block index 2563 BlockIndexEntry* idxEntry; 2564 Block* newBlock; 2565 bool indexInserted = false; 2566 auto head = this->headIndex.load(std::memory_order_relaxed); 2567 assert(!details::circular_less_than<index_t>(currentTailIndex, head)); 2568 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head)); 2569 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) { 2570 // Index allocation or block allocation failed; revert any other allocations 2571 // and index insertions done so far for this operation 2572 if (indexInserted) { 2573 rewind_block_index_tail(); 2574 idxEntry->value.store(nullptr, std::memory_order_relaxed); 2575 } 2576 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1); 2577 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) { 2578 currentTailIndex += static_cast<index_t>(BLOCK_SIZE); 2579 idxEntry = get_block_index_entry_for_index(currentTailIndex); 2580 idxEntry->value.store(nullptr, std::memory_order_relaxed); 2581 rewind_block_index_tail(); 2582 } 2583 this->parent->add_blocks_to_free_list(firstAllocatedBlock); 2584 this->tailBlock = startBlock; 2585 2586 return false; 2587 } 2588 2589 #if MCDBGQ_TRACKMEM 2590 newBlock->owner = this; 2591 #endif 2592 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>(); 2593 newBlock->next = nullptr; 2594 2595 // Insert the new block into the index 2596 idxEntry->value.store(newBlock, std::memory_order_relaxed); 2597 2598 // Store the chain of blocks so that we can undo if later allocations fail, 2599 // and so that we can find the blocks when we do the actual enqueueing 2600 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) { 2601 assert(this->tailBlock != nullptr); 2602 this->tailBlock->next = newBlock; 2603 } 2604 this->tailBlock = newBlock; 2605 endBlock = newBlock; 2606 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock; 2607 } while (blockBaseDiff > 0); 2608 } 2609 2610 // Enqueue, one block at a time 2611 index_t newTailIndex = startTailIndex + static_cast<index_t>(count); 2612 currentTailIndex = startTailIndex; 2613 this->tailBlock = startBlock; 2614 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0); 2615 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) { 2616 this->tailBlock = firstAllocatedBlock; 2617 } 2618 while (true) { 2619 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2620 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) { 2621 stopIndex = newTailIndex; 2622 } 2623 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))) { 2624 while (currentTailIndex != stopIndex) { 2625 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++); 2626 } 2627 } 2628 else { 2629 MOODYCAMEL_TRY { 2630 while (currentTailIndex != stopIndex) { 2631 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst)); 2632 ++currentTailIndex; 2633 ++itemFirst; 2634 } 2635 } 2636 MOODYCAMEL_CATCH (...) { 2637 auto constructedStopIndex = currentTailIndex; 2638 auto lastBlockEnqueued = this->tailBlock; 2639 2640 if (!details::is_trivially_destructible<T>::value) { 2641 auto block = startBlock; 2642 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) { 2643 block = firstAllocatedBlock; 2644 } 2645 currentTailIndex = startTailIndex; 2646 while (true) { 2647 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2648 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) { 2649 stopIndex = constructedStopIndex; 2650 } 2651 while (currentTailIndex != stopIndex) { 2652 (*block)[currentTailIndex++]->~T(); 2653 } 2654 if (block == lastBlockEnqueued) { 2655 break; 2656 } 2657 block = block->next; 2658 } 2659 } 2660 2661 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1); 2662 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) { 2663 currentTailIndex += static_cast<index_t>(BLOCK_SIZE); 2664 auto idxEntry = get_block_index_entry_for_index(currentTailIndex); 2665 idxEntry->value.store(nullptr, std::memory_order_relaxed); 2666 rewind_block_index_tail(); 2667 } 2668 this->parent->add_blocks_to_free_list(firstAllocatedBlock); 2669 this->tailBlock = startBlock; 2670 MOODYCAMEL_RETHROW; 2671 } 2672 } 2673 2674 if (this->tailBlock == endBlock) { 2675 assert(currentTailIndex == newTailIndex); 2676 break; 2677 } 2678 this->tailBlock = this->tailBlock->next; 2679 } 2680 this->tailIndex.store(newTailIndex, std::memory_order_release); 2681 return true; 2682 } 2683 2684 template<typename It> 2685 size_t dequeue_bulk(It& itemFirst, size_t max) 2686 { 2687 auto tail = this->tailIndex.load(std::memory_order_relaxed); 2688 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed); 2689 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit)); 2690 if (details::circular_less_than<size_t>(0, desiredCount)) { 2691 desiredCount = desiredCount < max ? desiredCount : max; 2692 std::atomic_thread_fence(std::memory_order_acquire); 2693 2694 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed); 2695 assert(overcommit <= myDequeueCount); 2696 2697 tail = this->tailIndex.load(std::memory_order_acquire); 2698 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit)); 2699 if (details::circular_less_than<size_t>(0, actualCount)) { 2700 actualCount = desiredCount < actualCount ? desiredCount : actualCount; 2701 if (actualCount < desiredCount) { 2702 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release); 2703 } 2704 2705 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this 2706 // will never exceed tail. 2707 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel); 2708 2709 // Iterate the blocks and dequeue 2710 auto index = firstIndex; 2711 BlockIndexHeader* localBlockIndex; 2712 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex); 2713 do { 2714 auto blockStartIndex = index; 2715 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2716 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 2717 2718 auto entry = localBlockIndex->index[indexIndex]; 2719 auto block = entry->value.load(std::memory_order_relaxed); 2720 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) { 2721 while (index != endIndex) { 2722 auto& el = *((*block)[index]); 2723 *itemFirst++ = std::move(el); 2724 el.~T(); 2725 ++index; 2726 } 2727 } 2728 else { 2729 MOODYCAMEL_TRY { 2730 while (index != endIndex) { 2731 auto& el = *((*block)[index]); 2732 *itemFirst = std::move(el); 2733 ++itemFirst; 2734 el.~T(); 2735 ++index; 2736 } 2737 } 2738 MOODYCAMEL_CATCH (...) { 2739 do { 2740 entry = localBlockIndex->index[indexIndex]; 2741 block = entry->value.load(std::memory_order_relaxed); 2742 while (index != endIndex) { 2743 (*block)[index++]->~T(); 2744 } 2745 2746 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) { 2747 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2748 debug::DebugLock lock(mutex); 2749 #endif 2750 entry->value.store(nullptr, std::memory_order_relaxed); 2751 this->parent->add_block_to_free_list(block); 2752 } 2753 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1); 2754 2755 blockStartIndex = index; 2756 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE); 2757 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex; 2758 } while (index != firstIndex + actualCount); 2759 2760 MOODYCAMEL_RETHROW; 2761 } 2762 } 2763 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) { 2764 { 2765 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2766 debug::DebugLock lock(mutex); 2767 #endif 2768 // Note that the set_many_empty above did a release, meaning that anybody who acquires the block 2769 // we're about to free can use it safely since our writes (and reads!) will have happened-before then. 2770 entry->value.store(nullptr, std::memory_order_relaxed); 2771 } 2772 this->parent->add_block_to_free_list(block); // releases the above store 2773 } 2774 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1); 2775 } while (index != firstIndex + actualCount); 2776 2777 return actualCount; 2778 } 2779 else { 2780 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release); 2781 } 2782 } 2783 2784 return 0; 2785 } 2786 2787 private: 2788 // The block size must be > 1, so any number with the low bit set is an invalid block base index 2789 static const index_t INVALID_BLOCK_BASE = 1; 2790 2791 struct BlockIndexEntry 2792 { 2793 std::atomic<index_t> key; 2794 std::atomic<Block*> value; 2795 }; 2796 2797 struct BlockIndexHeader 2798 { 2799 size_t capacity; 2800 std::atomic<size_t> tail; 2801 BlockIndexEntry* entries; 2802 BlockIndexEntry** index; 2803 BlockIndexHeader* prev; 2804 }; 2805 2806 template<AllocationMode allocMode> 2807 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex) 2808 { 2809 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK 2810 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1); 2811 idxEntry = localBlockIndex->index[newTail]; 2812 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE || 2813 idxEntry->value.load(std::memory_order_relaxed) == nullptr) { 2814 2815 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed); 2816 localBlockIndex->tail.store(newTail, std::memory_order_release); 2817 return true; 2818 } 2819 2820 // No room in the old block index, try to allocate another one! 2821 if (allocMode == CannotAlloc || !new_block_index()) { 2822 return false; 2823 } 2824 localBlockIndex = blockIndex.load(std::memory_order_relaxed); 2825 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1); 2826 idxEntry = localBlockIndex->index[newTail]; 2827 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE); 2828 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed); 2829 localBlockIndex->tail.store(newTail, std::memory_order_release); 2830 return true; 2831 } 2832 2833 inline void rewind_block_index_tail() 2834 { 2835 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); 2836 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed); 2837 } 2838 2839 inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const 2840 { 2841 BlockIndexHeader* localBlockIndex; 2842 auto idx = get_block_index_index_for_index(index, localBlockIndex); 2843 return localBlockIndex->index[idx]; 2844 } 2845 2846 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const 2847 { 2848 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2849 debug::DebugLock lock(mutex); 2850 #endif 2851 index &= ~static_cast<index_t>(BLOCK_SIZE - 1); 2852 localBlockIndex = blockIndex.load(std::memory_order_acquire); 2853 auto tail = localBlockIndex->tail.load(std::memory_order_acquire); 2854 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed); 2855 assert(tailBase != INVALID_BLOCK_BASE); 2856 // Note: Must use division instead of shift because the index may wrap around, causing a negative 2857 // offset, whose negativity we want to preserve 2858 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE); 2859 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1); 2860 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr); 2861 return idx; 2862 } 2863 2864 bool new_block_index() 2865 { 2866 auto prev = blockIndex.load(std::memory_order_relaxed); 2867 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity; 2868 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity; 2869 auto raw = static_cast<char*>((Traits::malloc)( 2870 sizeof(BlockIndexHeader) + 2871 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount + 2872 std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity)); 2873 if (raw == nullptr) { 2874 return false; 2875 } 2876 2877 auto header = new (raw) BlockIndexHeader; 2878 auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader))); 2879 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount)); 2880 if (prev != nullptr) { 2881 auto prevTail = prev->tail.load(std::memory_order_relaxed); 2882 auto prevPos = prevTail; 2883 size_t i = 0; 2884 do { 2885 prevPos = (prevPos + 1) & (prev->capacity - 1); 2886 index[i++] = prev->index[prevPos]; 2887 } while (prevPos != prevTail); 2888 assert(i == prevCapacity); 2889 } 2890 for (size_t i = 0; i != entryCount; ++i) { 2891 new (entries + i) BlockIndexEntry; 2892 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed); 2893 index[prevCapacity + i] = entries + i; 2894 } 2895 header->prev = prev; 2896 header->entries = entries; 2897 header->index = index; 2898 header->capacity = nextBlockIndexCapacity; 2899 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed); 2900 2901 blockIndex.store(header, std::memory_order_release); 2902 2903 nextBlockIndexCapacity <<= 1; 2904 2905 return true; 2906 } 2907 2908 private: 2909 size_t nextBlockIndexCapacity; 2910 std::atomic<BlockIndexHeader*> blockIndex; 2911 2912 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 2913 public: 2914 details::ThreadExitListener threadExitListener; 2915 private: 2916 #endif 2917 2918 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 2919 public: 2920 ImplicitProducer* nextImplicitProducer; 2921 private: 2922 #endif 2923 2924 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX 2925 mutable debug::DebugMutex mutex; 2926 #endif 2927 #if MCDBGQ_TRACKMEM 2928 friend struct MemStats; 2929 #endif 2930 }; 2931 2932 2933 ////////////////////////////////// 2934 // Block pool manipulation 2935 ////////////////////////////////// 2936 2937 void populate_initial_block_list(size_t blockCount) 2938 { 2939 initialBlockPoolSize = blockCount; 2940 if (initialBlockPoolSize == 0) { 2941 initialBlockPool = nullptr; 2942 return; 2943 } 2944 2945 initialBlockPool = create_array<Block>(blockCount); 2946 if (initialBlockPool == nullptr) { 2947 initialBlockPoolSize = 0; 2948 } 2949 for (size_t i = 0; i < initialBlockPoolSize; ++i) { 2950 initialBlockPool[i].dynamicallyAllocated = false; 2951 } 2952 } 2953 2954 inline Block* try_get_block_from_initial_pool() 2955 { 2956 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) { 2957 return nullptr; 2958 } 2959 2960 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed); 2961 2962 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr; 2963 } 2964 2965 inline void add_block_to_free_list(Block* block) 2966 { 2967 #if MCDBGQ_TRACKMEM 2968 block->owner = nullptr; 2969 #endif 2970 freeList.add(block); 2971 } 2972 2973 inline void add_blocks_to_free_list(Block* block) 2974 { 2975 while (block != nullptr) { 2976 auto next = block->next; 2977 add_block_to_free_list(block); 2978 block = next; 2979 } 2980 } 2981 2982 inline Block* try_get_block_from_free_list() 2983 { 2984 return freeList.try_get(); 2985 } 2986 2987 // Gets a free block from one of the memory pools, or allocates a new one (if applicable) 2988 template<AllocationMode canAlloc> 2989 Block* requisition_block() 2990 { 2991 auto block = try_get_block_from_initial_pool(); 2992 if (block != nullptr) { 2993 return block; 2994 } 2995 2996 block = try_get_block_from_free_list(); 2997 if (block != nullptr) { 2998 return block; 2999 } 3000 3001 if (canAlloc == CanAlloc) { 3002 return create<Block>(); 3003 } 3004 3005 return nullptr; 3006 } 3007 3008 3009 #if MCDBGQ_TRACKMEM 3010 public: 3011 struct MemStats { 3012 size_t allocatedBlocks; 3013 size_t usedBlocks; 3014 size_t freeBlocks; 3015 size_t ownedBlocksExplicit; 3016 size_t ownedBlocksImplicit; 3017 size_t implicitProducers; 3018 size_t explicitProducers; 3019 size_t elementsEnqueued; 3020 size_t blockClassBytes; 3021 size_t queueClassBytes; 3022 size_t implicitBlockIndexBytes; 3023 size_t explicitBlockIndexBytes; 3024 3025 friend class ConcurrentQueue; 3026 3027 private: 3028 static MemStats getFor(ConcurrentQueue* q) 3029 { 3030 MemStats stats = { 0 }; 3031 3032 stats.elementsEnqueued = q->size_approx(); 3033 3034 auto block = q->freeList.head_unsafe(); 3035 while (block != nullptr) { 3036 ++stats.allocatedBlocks; 3037 ++stats.freeBlocks; 3038 block = block->freeListNext.load(std::memory_order_relaxed); 3039 } 3040 3041 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 3042 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr; 3043 stats.implicitProducers += implicit ? 1 : 0; 3044 stats.explicitProducers += implicit ? 0 : 1; 3045 3046 if (implicit) { 3047 auto prod = static_cast<ImplicitProducer*>(ptr); 3048 stats.queueClassBytes += sizeof(ImplicitProducer); 3049 auto head = prod->headIndex.load(std::memory_order_relaxed); 3050 auto tail = prod->tailIndex.load(std::memory_order_relaxed); 3051 auto hash = prod->blockIndex.load(std::memory_order_relaxed); 3052 if (hash != nullptr) { 3053 for (size_t i = 0; i != hash->capacity; ++i) { 3054 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) { 3055 ++stats.allocatedBlocks; 3056 ++stats.ownedBlocksImplicit; 3057 } 3058 } 3059 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry); 3060 for (; hash != nullptr; hash = hash->prev) { 3061 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*); 3062 } 3063 } 3064 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) { 3065 //auto block = prod->get_block_index_entry_for_index(head); 3066 ++stats.usedBlocks; 3067 } 3068 } 3069 else { 3070 auto prod = static_cast<ExplicitProducer*>(ptr); 3071 stats.queueClassBytes += sizeof(ExplicitProducer); 3072 auto tailBlock = prod->tailBlock; 3073 bool wasNonEmpty = false; 3074 if (tailBlock != nullptr) { 3075 auto block = tailBlock; 3076 do { 3077 ++stats.allocatedBlocks; 3078 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) { 3079 ++stats.usedBlocks; 3080 wasNonEmpty = wasNonEmpty || block != tailBlock; 3081 } 3082 ++stats.ownedBlocksExplicit; 3083 block = block->next; 3084 } while (block != tailBlock); 3085 } 3086 auto index = prod->blockIndex.load(std::memory_order_relaxed); 3087 while (index != nullptr) { 3088 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry); 3089 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev); 3090 } 3091 } 3092 } 3093 3094 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed); 3095 stats.allocatedBlocks += freeOnInitialPool; 3096 stats.freeBlocks += freeOnInitialPool; 3097 3098 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks; 3099 stats.queueClassBytes += sizeof(ConcurrentQueue); 3100 3101 return stats; 3102 } 3103 }; 3104 3105 // For debugging only. Not thread-safe. 3106 MemStats getMemStats() 3107 { 3108 return MemStats::getFor(this); 3109 } 3110 private: 3111 friend struct MemStats; 3112 #endif 3113 3114 3115 ////////////////////////////////// 3116 // Producer list manipulation 3117 ////////////////////////////////// 3118 3119 ProducerBase* recycle_or_create_producer(bool isExplicit) 3120 { 3121 bool recycled; 3122 return recycle_or_create_producer(isExplicit, recycled); 3123 } 3124 3125 ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled) 3126 { 3127 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3128 debug::DebugLock lock(implicitProdMutex); 3129 #endif 3130 // Try to re-use one first 3131 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) { 3132 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) { 3133 bool expected = true; 3134 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) { 3135 // We caught one! It's been marked as activated, the caller can have it 3136 recycled = true; 3137 return ptr; 3138 } 3139 } 3140 } 3141 3142 recycled = false; 3143 return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this)); 3144 } 3145 3146 ProducerBase* add_producer(ProducerBase* producer) 3147 { 3148 // Handle failed memory allocation 3149 if (producer == nullptr) { 3150 return nullptr; 3151 } 3152 3153 producerCount.fetch_add(1, std::memory_order_relaxed); 3154 3155 // Add it to the lock-free list 3156 auto prevTail = producerListTail.load(std::memory_order_relaxed); 3157 do { 3158 producer->next = prevTail; 3159 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed)); 3160 3161 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3162 if (producer->isExplicit) { 3163 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed); 3164 do { 3165 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit; 3166 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed)); 3167 } 3168 else { 3169 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed); 3170 do { 3171 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit; 3172 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed)); 3173 } 3174 #endif 3175 3176 return producer; 3177 } 3178 3179 void reown_producers() 3180 { 3181 // After another instance is moved-into/swapped-with this one, all the 3182 // producers we stole still think their parents are the other queue. 3183 // So fix them up! 3184 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) { 3185 ptr->parent = this; 3186 } 3187 } 3188 3189 3190 ////////////////////////////////// 3191 // Implicit producer hash 3192 ////////////////////////////////// 3193 3194 struct ImplicitProducerKVP 3195 { 3196 std::atomic<details::thread_id_t> key; 3197 ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place 3198 3199 ImplicitProducerKVP() : value(nullptr) { } 3200 3201 ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT 3202 { 3203 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed); 3204 value = other.value; 3205 } 3206 3207 inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT 3208 { 3209 swap(other); 3210 return *this; 3211 } 3212 3213 inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT 3214 { 3215 if (this != &other) { 3216 details::swap_relaxed(key, other.key); 3217 std::swap(value, other.value); 3218 } 3219 } 3220 }; 3221 3222 template<typename XT, typename XTraits> 3223 friend void moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT; 3224 3225 struct ImplicitProducerHash 3226 { 3227 size_t capacity; 3228 ImplicitProducerKVP* entries; 3229 ImplicitProducerHash* prev; 3230 }; 3231 3232 inline void populate_initial_implicit_producer_hash() 3233 { 3234 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return; 3235 3236 implicitProducerHashCount.store(0, std::memory_order_relaxed); 3237 auto hash = &initialImplicitProducerHash; 3238 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; 3239 hash->entries = &initialImplicitProducerHashEntries[0]; 3240 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) { 3241 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed); 3242 } 3243 hash->prev = nullptr; 3244 implicitProducerHash.store(hash, std::memory_order_relaxed); 3245 } 3246 3247 void swap_implicit_producer_hashes(ConcurrentQueue& other) 3248 { 3249 if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return; 3250 3251 // Swap (assumes our implicit producer hash is initialized) 3252 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries); 3253 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0]; 3254 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0]; 3255 3256 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount); 3257 3258 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash); 3259 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) { 3260 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed); 3261 } 3262 else { 3263 ImplicitProducerHash* hash; 3264 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) { 3265 continue; 3266 } 3267 hash->prev = &initialImplicitProducerHash; 3268 } 3269 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) { 3270 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed); 3271 } 3272 else { 3273 ImplicitProducerHash* hash; 3274 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) { 3275 continue; 3276 } 3277 hash->prev = &other.initialImplicitProducerHash; 3278 } 3279 } 3280 3281 // Only fails (returns nullptr) if memory allocation fails 3282 ImplicitProducer* get_or_add_implicit_producer() 3283 { 3284 // Note that since the data is essentially thread-local (key is thread ID), 3285 // there's a reduced need for fences (memory ordering is already consistent 3286 // for any individual thread), except for the current table itself. 3287 3288 // Start by looking for the thread ID in the current and all previous hash tables. 3289 // If it's not found, it must not be in there yet, since this same thread would 3290 // have added it previously to one of the tables that we traversed. 3291 3292 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table 3293 3294 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3295 debug::DebugLock lock(implicitProdMutex); 3296 #endif 3297 3298 auto id = details::thread_id(); 3299 auto hashedId = details::hash_thread_id(id); 3300 3301 auto mainHash = implicitProducerHash.load(std::memory_order_acquire); 3302 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) { 3303 // Look for the id in this hash 3304 auto index = hashedId; 3305 while (true) { // Not an infinite loop because at least one slot is free in the hash table 3306 index &= hash->capacity - 1; 3307 3308 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed); 3309 if (probedKey == id) { 3310 // Found it! If we had to search several hashes deep, though, we should lazily add it 3311 // to the current main hash table to avoid the extended search next time. 3312 // Note there's guaranteed to be room in the current hash table since every subsequent 3313 // table implicitly reserves space for all previous tables (there's only one 3314 // implicitProducerHashCount). 3315 auto value = hash->entries[index].value; 3316 if (hash != mainHash) { 3317 index = hashedId; 3318 while (true) { 3319 index &= mainHash->capacity - 1; 3320 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed); 3321 auto empty = details::invalid_thread_id; 3322 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3323 auto reusable = details::invalid_thread_id2; 3324 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) || 3325 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) { 3326 #else 3327 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed))) { 3328 #endif 3329 mainHash->entries[index].value = value; 3330 break; 3331 } 3332 ++index; 3333 } 3334 } 3335 3336 return value; 3337 } 3338 if (probedKey == details::invalid_thread_id) { 3339 break; // Not in this hash table 3340 } 3341 ++index; 3342 } 3343 } 3344 3345 // Insert! 3346 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed); 3347 while (true) { 3348 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(std::memory_order_acquire)) { 3349 // We've acquired the resize lock, try to allocate a bigger hash table. 3350 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when 3351 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this 3352 // locked block). 3353 mainHash = implicitProducerHash.load(std::memory_order_acquire); 3354 if (newCount >= (mainHash->capacity >> 1)) { 3355 auto newCapacity = mainHash->capacity << 1; 3356 while (newCount >= (newCapacity >> 1)) { 3357 newCapacity <<= 1; 3358 } 3359 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity)); 3360 if (raw == nullptr) { 3361 // Allocation failed 3362 implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed); 3363 implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed); 3364 return nullptr; 3365 } 3366 3367 auto newHash = new (raw) ImplicitProducerHash; 3368 newHash->capacity = newCapacity; 3369 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash))); 3370 for (size_t i = 0; i != newCapacity; ++i) { 3371 new (newHash->entries + i) ImplicitProducerKVP; 3372 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed); 3373 } 3374 newHash->prev = mainHash; 3375 implicitProducerHash.store(newHash, std::memory_order_release); 3376 implicitProducerHashResizeInProgress.clear(std::memory_order_release); 3377 mainHash = newHash; 3378 } 3379 else { 3380 implicitProducerHashResizeInProgress.clear(std::memory_order_release); 3381 } 3382 } 3383 3384 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table 3385 // to finish being allocated by another thread (and if we just finished allocating above, the condition will 3386 // always be true) 3387 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) { 3388 bool recycled; 3389 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled)); 3390 if (producer == nullptr) { 3391 implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed); 3392 return nullptr; 3393 } 3394 if (recycled) { 3395 implicitProducerHashCount.fetch_add(-1, std::memory_order_relaxed); 3396 } 3397 3398 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3399 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback; 3400 producer->threadExitListener.userData = producer; 3401 details::ThreadExitNotifier::subscribe(&producer->threadExitListener); 3402 #endif 3403 3404 auto index = hashedId; 3405 while (true) { 3406 index &= mainHash->capacity - 1; 3407 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed); 3408 3409 auto empty = details::invalid_thread_id; 3410 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3411 auto reusable = details::invalid_thread_id2; 3412 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed)) || 3413 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire))) { 3414 #else 3415 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed))) { 3416 #endif 3417 mainHash->entries[index].value = producer; 3418 break; 3419 } 3420 ++index; 3421 } 3422 return producer; 3423 } 3424 3425 // Hmm, the old hash is quite full and somebody else is busy allocating a new one. 3426 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not, 3427 // we try to allocate ourselves). 3428 mainHash = implicitProducerHash.load(std::memory_order_acquire); 3429 } 3430 } 3431 3432 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED 3433 void implicit_producer_thread_exited(ImplicitProducer* producer) 3434 { 3435 // Remove from thread exit listeners 3436 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener); 3437 3438 // Remove from hash 3439 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3440 debug::DebugLock lock(implicitProdMutex); 3441 #endif 3442 auto hash = implicitProducerHash.load(std::memory_order_acquire); 3443 assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place 3444 auto id = details::thread_id(); 3445 auto hashedId = details::hash_thread_id(id); 3446 details::thread_id_t probedKey; 3447 3448 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are 3449 // trying to add an entry thinking there's a free slot (because they reused a producer) 3450 for (; hash != nullptr; hash = hash->prev) { 3451 auto index = hashedId; 3452 do { 3453 index &= hash->capacity - 1; 3454 probedKey = hash->entries[index].key.load(std::memory_order_relaxed); 3455 if (probedKey == id) { 3456 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release); 3457 break; 3458 } 3459 ++index; 3460 } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place 3461 } 3462 3463 // Mark the queue as being recyclable 3464 producer->inactive.store(true, std::memory_order_release); 3465 } 3466 3467 static void implicit_producer_thread_exited_callback(void* userData) 3468 { 3469 auto producer = static_cast<ImplicitProducer*>(userData); 3470 auto queue = producer->parent; 3471 queue->implicit_producer_thread_exited(producer); 3472 } 3473 #endif 3474 3475 ////////////////////////////////// 3476 // Utility functions 3477 ////////////////////////////////// 3478 3479 template<typename U> 3480 static inline U* create_array(size_t count) 3481 { 3482 assert(count > 0); 3483 auto p = static_cast<U*>((Traits::malloc)(sizeof(U) * count)); 3484 if (p == nullptr) { 3485 return nullptr; 3486 } 3487 3488 for (size_t i = 0; i != count; ++i) { 3489 new (p + i) U(); 3490 } 3491 return p; 3492 } 3493 3494 template<typename U> 3495 static inline void destroy_array(U* p, size_t count) 3496 { 3497 if (p != nullptr) { 3498 assert(count > 0); 3499 for (size_t i = count; i != 0; ) { 3500 (p + --i)->~U(); 3501 } 3502 (Traits::free)(p); 3503 } 3504 } 3505 3506 template<typename U> 3507 static inline U* create() 3508 { 3509 auto p = (Traits::malloc)(sizeof(U)); 3510 return p != nullptr ? new (p) U : nullptr; 3511 } 3512 3513 template<typename U, typename A1> 3514 static inline U* create(A1&& a1) 3515 { 3516 auto p = (Traits::malloc)(sizeof(U)); 3517 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr; 3518 } 3519 3520 template<typename U> 3521 static inline void destroy(U* p) 3522 { 3523 if (p != nullptr) { 3524 p->~U(); 3525 } 3526 (Traits::free)(p); 3527 } 3528 3529 private: 3530 std::atomic<ProducerBase*> producerListTail; 3531 std::atomic<std::uint32_t> producerCount; 3532 3533 std::atomic<size_t> initialBlockPoolIndex; 3534 Block* initialBlockPool; 3535 size_t initialBlockPoolSize; 3536 3537 #if !MCDBGQ_USEDEBUGFREELIST 3538 FreeList<Block> freeList; 3539 #else 3540 debug::DebugFreeList<Block> freeList; 3541 #endif 3542 3543 std::atomic<ImplicitProducerHash*> implicitProducerHash; 3544 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used 3545 ImplicitProducerHash initialImplicitProducerHash; 3546 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries; 3547 std::atomic_flag implicitProducerHashResizeInProgress; 3548 3549 std::atomic<std::uint32_t> nextExplicitConsumerId; 3550 std::atomic<std::uint32_t> globalExplicitConsumerOffset; 3551 3552 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH 3553 debug::DebugMutex implicitProdMutex; 3554 #endif 3555 3556 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG 3557 std::atomic<ExplicitProducer*> explicitProducers; 3558 std::atomic<ImplicitProducer*> implicitProducers; 3559 #endif 3560 }; 3561 3562 3563 template<typename T, typename Traits> 3564 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue) 3565 : producer(queue.recycle_or_create_producer(true)) 3566 { 3567 if (producer != nullptr) { 3568 producer->token = this; 3569 } 3570 } 3571 3572 template<typename T, typename Traits> 3573 ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits>& queue) 3574 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true)) 3575 { 3576 if (producer != nullptr) { 3577 producer->token = this; 3578 } 3579 } 3580 3581 template<typename T, typename Traits> 3582 ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue) 3583 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) 3584 { 3585 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release); 3586 lastKnownGlobalOffset = -1; 3587 } 3588 3589 template<typename T, typename Traits> 3590 ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits>& queue) 3591 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr) 3592 { 3593 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release); 3594 lastKnownGlobalOffset = -1; 3595 } 3596 3597 template<typename T, typename Traits> 3598 inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT 3599 { 3600 a.swap(b); 3601 } 3602 3603 inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT 3604 { 3605 a.swap(b); 3606 } 3607 3608 inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT 3609 { 3610 a.swap(b); 3611 } 3612 3613 template<typename T, typename Traits> 3614 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT 3615 { 3616 a.swap(b); 3617 } 3618 3619 } 3620 3621 #if defined(__GNUC__) 3622 #pragma GCC diagnostic pop 3623 #endif 3624