1 /* 2 * Copyright © 2016 Mozilla Foundation 3 * 4 * This program is made available under an ISC-style license. See the 5 * accompanying file LICENSE for details. 6 */ 7 8 #ifndef CUBEB_RING_BUFFER_H 9 #define CUBEB_RING_BUFFER_H 10 11 #include "cubeb_utils.h" 12 #include <algorithm> 13 #include <atomic> 14 #include <cstdint> 15 #include <memory> 16 #include <thread> 17 18 /** 19 * Single producer single consumer lock-free and wait-free ring buffer. 20 * 21 * This data structure allows producing data from one thread, and consuming it on 22 * another thread, safely and without explicit synchronization. If used on two 23 * threads, this data structure uses atomics for thread safety. It is possible 24 * to disable the use of atomics at compile time and only use this data 25 * structure on one thread. 26 * 27 * The role for the producer and the consumer must be constant, i.e., the 28 * producer should always be on one thread and the consumer should always be on 29 * another thread. 30 * 31 * Some words about the inner workings of this class: 32 * - Capacity is fixed. Only one allocation is performed, in the constructor. 33 * When reading and writing, the return value of the method allows checking if 34 * the ring buffer is empty or full. 35 * - We always keep the read index at least one element ahead of the write 36 * index, so we can distinguish between an empty and a full ring buffer: an 37 * empty ring buffer is when the write index is at the same position as the 38 * read index. A full buffer is when the write index is exactly one position 39 * before the read index. 40 * - We synchronize updates to the read index after having read the data, and 41 * the write index after having written the data. This means that the each 42 * thread can only touch a portion of the buffer that is not touched by the 43 * other thread. 44 * - Callers are expected to provide buffers. When writing to the queue, 45 * elements are copied into the internal storage from the buffer passed in. 46 * When reading from the queue, the user is expected to provide a buffer. 47 * Because this is a ring buffer, data might not be contiguous in memory, 48 * providing an external buffer to copy into is an easy way to have linear 49 * data for further processing. 50 */ 51 template <typename T> 52 class ring_buffer_base 53 { 54 public: 55 /** 56 * Constructor for a ring buffer. 57 * 58 * This performs an allocation, but is the only allocation that will happen 59 * for the life time of a `ring_buffer_base`. 60 * 61 * @param capacity The maximum number of element this ring buffer will hold. 62 */ ring_buffer_base(int capacity)63 ring_buffer_base(int capacity) 64 /* One more element to distinguish from empty and full buffer. */ 65 : capacity_(capacity + 1) 66 { 67 assert(storage_capacity() < 68 std::numeric_limits<int>::max() / 2 && 69 "buffer too large for the type of index used."); 70 assert(capacity_ > 0); 71 72 data_.reset(new T[storage_capacity()]); 73 /* If this queue is using atomics, initializing those members as the last 74 * action in the constructor acts as a full barrier, and allow capacity() to 75 * be thread-safe. */ 76 write_index_ = 0; 77 read_index_ = 0; 78 } 79 /** 80 * Push `count` zero or default constructed elements in the array. 81 * 82 * Only safely called on the producer thread. 83 * 84 * @param count The number of elements to enqueue. 85 * @return The number of element enqueued. 86 */ enqueue_default(int count)87 int enqueue_default(int count) 88 { 89 return enqueue(nullptr, count); 90 } 91 /** 92 * @brief Put an element in the queue 93 * 94 * Only safely called on the producer thread. 95 * 96 * @param element The element to put in the queue. 97 * 98 * @return 1 if the element was inserted, 0 otherwise. 99 */ enqueue(T & element)100 int enqueue(T& element) 101 { 102 return enqueue(&element, 1); 103 } 104 /** 105 * Push `count` elements in the ring buffer. 106 * 107 * Only safely called on the producer thread. 108 * 109 * @param elements a pointer to a buffer containing at least `count` elements. 110 * If `elements` is nullptr, zero or default constructed elements are enqueued. 111 * @param count The number of elements to read from `elements` 112 * @return The number of elements successfully coped from `elements` and inserted 113 * into the ring buffer. 114 */ enqueue(T * elements,int count)115 int enqueue(T * elements, int count) 116 { 117 #ifndef NDEBUG 118 assert_correct_thread(producer_id); 119 #endif 120 121 int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed); 122 int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed); 123 124 if (full_internal(rd_idx, wr_idx)) { 125 return 0; 126 } 127 128 int to_write = 129 std::min(available_write_internal(rd_idx, wr_idx), count); 130 131 /* First part, from the write index to the end of the array. */ 132 int first_part = std::min(storage_capacity() - wr_idx, 133 to_write); 134 /* Second part, from the beginning of the array */ 135 int second_part = to_write - first_part; 136 137 if (elements) { 138 Copy(data_.get() + wr_idx, elements, first_part); 139 Copy(data_.get(), elements + first_part, second_part); 140 } else { 141 ConstructDefault(data_.get() + wr_idx, first_part); 142 ConstructDefault(data_.get(), second_part); 143 } 144 145 write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release); 146 147 return to_write; 148 } 149 /** 150 * Retrieve at most `count` elements from the ring buffer, and copy them to 151 * `elements`, if non-null. 152 * 153 * Only safely called on the consumer side. 154 * 155 * @param elements A pointer to a buffer with space for at least `count` 156 * elements. If `elements` is `nullptr`, `count` element will be discarded. 157 * @param count The maximum number of elements to dequeue. 158 * @return The number of elements written to `elements`. 159 */ dequeue(T * elements,int count)160 int dequeue(T * elements, int count) 161 { 162 #ifndef NDEBUG 163 assert_correct_thread(consumer_id); 164 #endif 165 166 int wr_idx = write_index_.load(std::memory_order::memory_order_acquire); 167 int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed); 168 169 if (empty_internal(rd_idx, wr_idx)) { 170 return 0; 171 } 172 173 int to_read = 174 std::min(available_read_internal(rd_idx, wr_idx), count); 175 176 int first_part = std::min(storage_capacity() - rd_idx, to_read); 177 int second_part = to_read - first_part; 178 179 if (elements) { 180 Copy(elements, data_.get() + rd_idx, first_part); 181 Copy(elements + first_part, data_.get(), second_part); 182 } 183 184 read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed); 185 186 return to_read; 187 } 188 /** 189 * Get the number of available element for consuming. 190 * 191 * Only safely called on the consumer thread. 192 * 193 * @return The number of available elements for reading. 194 */ available_read()195 int available_read() const 196 { 197 #ifndef NDEBUG 198 assert_correct_thread(consumer_id); 199 #endif 200 return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed), 201 write_index_.load(std::memory_order::memory_order_relaxed)); 202 } 203 /** 204 * Get the number of available elements for consuming. 205 * 206 * Only safely called on the producer thread. 207 * 208 * @return The number of empty slots in the buffer, available for writing. 209 */ available_write()210 int available_write() const 211 { 212 #ifndef NDEBUG 213 assert_correct_thread(producer_id); 214 #endif 215 return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed), 216 write_index_.load(std::memory_order::memory_order_relaxed)); 217 } 218 /** 219 * Get the total capacity, for this ring buffer. 220 * 221 * Can be called safely on any thread. 222 * 223 * @return The maximum capacity of this ring buffer. 224 */ capacity()225 int capacity() const 226 { 227 return storage_capacity() - 1; 228 } 229 /** 230 * Reset the consumer and producer thread identifier, in case the thread are 231 * being changed. This has to be externally synchronized. This is no-op when 232 * asserts are disabled. 233 */ reset_thread_ids()234 void reset_thread_ids() 235 { 236 #ifndef NDEBUG 237 consumer_id = producer_id = std::thread::id(); 238 #endif 239 } 240 private: 241 /** Return true if the ring buffer is empty. 242 * 243 * @param read_index the read index to consider 244 * @param write_index the write index to consider 245 * @return true if the ring buffer is empty, false otherwise. 246 **/ empty_internal(int read_index,int write_index)247 bool empty_internal(int read_index, 248 int write_index) const 249 { 250 return write_index == read_index; 251 } 252 /** Return true if the ring buffer is full. 253 * 254 * This happens if the write index is exactly one element behind the read 255 * index. 256 * 257 * @param read_index the read index to consider 258 * @param write_index the write index to consider 259 * @return true if the ring buffer is full, false otherwise. 260 **/ full_internal(int read_index,int write_index)261 bool full_internal(int read_index, 262 int write_index) const 263 { 264 return (write_index + 1) % storage_capacity() == read_index; 265 } 266 /** 267 * Return the size of the storage. It is one more than the number of elements 268 * that can be stored in the buffer. 269 * 270 * @return the number of elements that can be stored in the buffer. 271 */ storage_capacity()272 int storage_capacity() const 273 { 274 return capacity_; 275 } 276 /** 277 * Returns the number of elements available for reading. 278 * 279 * @return the number of available elements for reading. 280 */ 281 int available_read_internal(int read_index,int write_index)282 available_read_internal(int read_index, 283 int write_index) const 284 { 285 if (write_index >= read_index) { 286 return write_index - read_index; 287 } else { 288 return write_index + storage_capacity() - read_index; 289 } 290 } 291 /** 292 * Returns the number of empty elements, available for writing. 293 * 294 * @return the number of elements that can be written into the array. 295 */ 296 int available_write_internal(int read_index,int write_index)297 available_write_internal(int read_index, 298 int write_index) const 299 { 300 /* We substract one element here to always keep at least one sample 301 * free in the buffer, to distinguish between full and empty array. */ 302 int rv = read_index - write_index - 1; 303 if (write_index >= read_index) { 304 rv += storage_capacity(); 305 } 306 return rv; 307 } 308 /** 309 * Increments an index, wrapping it around the storage. 310 * 311 * @param index a reference to the index to increment. 312 * @param increment the number by which `index` is incremented. 313 * @return the new index. 314 */ 315 int increment_index(int index,int increment)316 increment_index(int index, int increment) const 317 { 318 assert(increment >= 0); 319 return (index + increment) % storage_capacity(); 320 } 321 /** 322 * @brief This allows checking that enqueue (resp. dequeue) are always called 323 * by the right thread. 324 * 325 * @param id the id of the thread that has called the calling method first. 326 */ 327 #ifndef NDEBUG assert_correct_thread(std::thread::id & id)328 static void assert_correct_thread(std::thread::id& id) 329 { 330 if (id == std::thread::id()) { 331 id = std::this_thread::get_id(); 332 return; 333 } 334 assert(id == std::this_thread::get_id()); 335 } 336 #endif 337 /** Index at which the oldest element is at, in samples. */ 338 std::atomic<int> read_index_; 339 /** Index at which to write new elements. `write_index` is always at 340 * least one element ahead of `read_index_`. */ 341 std::atomic<int> write_index_; 342 /** Maximum number of elements that can be stored in the ring buffer. */ 343 const int capacity_; 344 /** Data storage */ 345 std::unique_ptr<T[]> data_; 346 #ifndef NDEBUG 347 /** The id of the only thread that is allowed to read from the queue. */ 348 mutable std::thread::id consumer_id; 349 /** The id of the only thread that is allowed to write from the queue. */ 350 mutable std::thread::id producer_id; 351 #endif 352 }; 353 354 /** 355 * Adapter for `ring_buffer_base` that exposes an interface in frames. 356 */ 357 template <typename T> 358 class audio_ring_buffer_base 359 { 360 public: 361 /** 362 * @brief Constructor. 363 * 364 * @param channel_count Number of channels. 365 * @param capacity_in_frames The capacity in frames. 366 */ audio_ring_buffer_base(int channel_count,int capacity_in_frames)367 audio_ring_buffer_base(int channel_count, int capacity_in_frames) 368 : channel_count(channel_count) 369 , ring_buffer(frames_to_samples(capacity_in_frames)) 370 { 371 assert(channel_count > 0); 372 } 373 /** 374 * @brief Enqueue silence. 375 * 376 * Only safely called on the producer thread. 377 * 378 * @param frame_count The number of frames of silence to enqueue. 379 * @return The number of frames of silence actually written to the queue. 380 */ enqueue_default(int frame_count)381 int enqueue_default(int frame_count) 382 { 383 return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count))); 384 } 385 /** 386 * @brief Enqueue `frames_count` frames of audio. 387 * 388 * Only safely called from the producer thread. 389 * 390 * @param [in] frames If non-null, the frames to enqueue. 391 * Otherwise, silent frames are enqueued. 392 * @param frame_count The number of frames to enqueue. 393 * 394 * @return The number of frames enqueued 395 */ 396 enqueue(T * frames,int frame_count)397 int enqueue(T * frames, int frame_count) 398 { 399 return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count))); 400 } 401 402 /** 403 * @brief Removes `frame_count` frames from the buffer, and 404 * write them to `frames` if it is non-null. 405 * 406 * Only safely called on the consumer thread. 407 * 408 * @param frames If non-null, the frames are copied to `frames`. 409 * Otherwise, they are dropped. 410 * @param frame_count The number of frames to remove. 411 * 412 * @return The number of frames actually dequeud. 413 */ dequeue(T * frames,int frame_count)414 int dequeue(T * frames, int frame_count) 415 { 416 return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count))); 417 } 418 /** 419 * Get the number of available frames of audio for consuming. 420 * 421 * Only safely called on the consumer thread. 422 * 423 * @return The number of available frames of audio for reading. 424 */ available_read()425 int available_read() const 426 { 427 return samples_to_frames(ring_buffer.available_read()); 428 } 429 /** 430 * Get the number of available frames of audio for consuming. 431 * 432 * Only safely called on the producer thread. 433 * 434 * @return The number of empty slots in the buffer, available for writing. 435 */ available_write()436 int available_write() const 437 { 438 return samples_to_frames(ring_buffer.available_write()); 439 } 440 /** 441 * Get the total capacity, for this ring buffer. 442 * 443 * Can be called safely on any thread. 444 * 445 * @return The maximum capacity of this ring buffer. 446 */ capacity()447 int capacity() const 448 { 449 return samples_to_frames(ring_buffer.capacity()); 450 } 451 private: 452 /** 453 * @brief Frames to samples conversion. 454 * 455 * @param frames The number of frames. 456 * 457 * @return A number of samples. 458 */ frames_to_samples(int frames)459 int frames_to_samples(int frames) const 460 { 461 return frames * channel_count; 462 } 463 /** 464 * @brief Samples to frames conversion. 465 * 466 * @param samples The number of samples. 467 * 468 * @return A number of frames. 469 */ samples_to_frames(int samples)470 int samples_to_frames(int samples) const 471 { 472 return samples / channel_count; 473 } 474 /** Number of channels of audio that will stream through this ring buffer. */ 475 int channel_count; 476 /** The underlying ring buffer that is used to store the data. */ 477 ring_buffer_base<T> ring_buffer; 478 }; 479 480 /** 481 * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use 482 * from two threads, one producer, one consumer (that never change role), 483 * without explicit synchronization. 484 */ 485 template<typename T> 486 using lock_free_queue = ring_buffer_base<T>; 487 /** 488 * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use 489 * from two threads, one producer, one consumer (that never change role), 490 * without explicit synchronization. 491 */ 492 template<typename T> 493 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>; 494 495 #endif // CUBEB_RING_BUFFER_H 496