1 /*
2  * Copyright © 2016 Mozilla Foundation
3  *
4  * This program is made available under an ISC-style license.  See the
5  * accompanying file LICENSE for details.
6  */
7 
8 #ifndef CUBEB_RING_BUFFER_H
9 #define CUBEB_RING_BUFFER_H
10 
11 #include "cubeb_utils.h"
12 #include <algorithm>
13 #include <atomic>
14 #include <cstdint>
15 #include <memory>
16 #include <thread>
17 
18 /**
19  * Single producer single consumer lock-free and wait-free ring buffer.
20  *
21  * This data structure allows producing data from one thread, and consuming it on
22  * another thread, safely and without explicit synchronization. If used on two
23  * threads, this data structure uses atomics for thread safety. It is possible
24  * to disable the use of atomics at compile time and only use this data
25  * structure on one thread.
26  *
27  * The role for the producer and the consumer must be constant, i.e., the
28  * producer should always be on one thread and the consumer should always be on
29  * another thread.
30  *
31  * Some words about the inner workings of this class:
32  * - Capacity is fixed. Only one allocation is performed, in the constructor.
33  *   When reading and writing, the return value of the method allows checking if
34  *   the ring buffer is empty or full.
35  * - We always keep the read index at least one element ahead of the write
36  *   index, so we can distinguish between an empty and a full ring buffer: an
37  *   empty ring buffer is when the write index is at the same position as the
38  *   read index. A full buffer is when the write index is exactly one position
39  *   before the read index.
40  * - We synchronize updates to the read index after having read the data, and
41  *   the write index after having written the data. This means that the each
42  *   thread can only touch a portion of the buffer that is not touched by the
43  *   other thread.
44  * - Callers are expected to provide buffers. When writing to the queue,
45  *   elements are copied into the internal storage from the buffer passed in.
46  *   When reading from the queue, the user is expected to provide a buffer.
47  *   Because this is a ring buffer, data might not be contiguous in memory,
48  *   providing an external buffer to copy into is an easy way to have linear
49  *   data for further processing.
50  */
51 template <typename T>
52 class ring_buffer_base
53 {
54 public:
55   /**
56    * Constructor for a ring buffer.
57    *
58    * This performs an allocation, but is the only allocation that will happen
59    * for the life time of a `ring_buffer_base`.
60    *
61    * @param capacity The maximum number of element this ring buffer will hold.
62    */
ring_buffer_base(int capacity)63   ring_buffer_base(int capacity)
64     /* One more element to distinguish from empty and full buffer. */
65     : capacity_(capacity + 1)
66   {
67     assert(storage_capacity() <
68            std::numeric_limits<int>::max() / 2 &&
69            "buffer too large for the type of index used.");
70     assert(capacity_ > 0);
71 
72     data_.reset(new T[storage_capacity()]);
73     /* If this queue is using atomics, initializing those members as the last
74      * action in the constructor acts as a full barrier, and allow capacity() to
75      * be thread-safe. */
76     write_index_ = 0;
77     read_index_ = 0;
78   }
79   /**
80    * Push `count` zero or default constructed elements in the array.
81    *
82    * Only safely called on the producer thread.
83    *
84    * @param count The number of elements to enqueue.
85    * @return The number of element enqueued.
86    */
enqueue_default(int count)87   int enqueue_default(int count)
88   {
89     return enqueue(nullptr, count);
90   }
91   /**
92    * @brief Put an element in the queue
93    *
94    * Only safely called on the producer thread.
95    *
96    * @param element The element to put in the queue.
97    *
98    * @return 1 if the element was inserted, 0 otherwise.
99    */
enqueue(T & element)100   int enqueue(T& element)
101   {
102     return enqueue(&element, 1);
103   }
104   /**
105    * Push `count` elements in the ring buffer.
106    *
107    * Only safely called on the producer thread.
108    *
109    * @param elements a pointer to a buffer containing at least `count` elements.
110    * If `elements` is nullptr, zero or default constructed elements are enqueued.
111    * @param count The number of elements to read from `elements`
112    * @return The number of elements successfully coped from `elements` and inserted
113    * into the ring buffer.
114    */
enqueue(T * elements,int count)115   int enqueue(T * elements, int count)
116   {
117 #ifndef NDEBUG
118     assert_correct_thread(producer_id);
119 #endif
120 
121     int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
122     int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed);
123 
124     if (full_internal(rd_idx, wr_idx)) {
125       return 0;
126     }
127 
128     int to_write =
129       std::min(available_write_internal(rd_idx, wr_idx), count);
130 
131     /* First part, from the write index to the end of the array. */
132     int first_part = std::min(storage_capacity() - wr_idx,
133                                           to_write);
134     /* Second part, from the beginning of the array */
135     int second_part = to_write - first_part;
136 
137     if (elements) {
138       Copy(data_.get() + wr_idx, elements, first_part);
139       Copy(data_.get(), elements + first_part, second_part);
140     } else {
141       ConstructDefault(data_.get() + wr_idx, first_part);
142       ConstructDefault(data_.get(), second_part);
143     }
144 
145     write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release);
146 
147     return to_write;
148   }
149   /**
150    * Retrieve at most `count` elements from the ring buffer, and copy them to
151    * `elements`, if non-null.
152    *
153    * Only safely called on the consumer side.
154    *
155    * @param elements A pointer to a buffer with space for at least `count`
156    * elements. If `elements` is `nullptr`, `count` element will be discarded.
157    * @param count The maximum number of elements to dequeue.
158    * @return The number of elements written to `elements`.
159    */
dequeue(T * elements,int count)160   int dequeue(T * elements, int count)
161   {
162 #ifndef NDEBUG
163     assert_correct_thread(consumer_id);
164 #endif
165 
166     int wr_idx = write_index_.load(std::memory_order::memory_order_acquire);
167     int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
168 
169     if (empty_internal(rd_idx, wr_idx)) {
170       return 0;
171     }
172 
173     int to_read =
174       std::min(available_read_internal(rd_idx, wr_idx), count);
175 
176     int first_part = std::min(storage_capacity() - rd_idx, to_read);
177     int second_part = to_read - first_part;
178 
179     if (elements) {
180       Copy(elements, data_.get() + rd_idx, first_part);
181       Copy(elements + first_part, data_.get(), second_part);
182     }
183 
184     read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed);
185 
186     return to_read;
187   }
188   /**
189    * Get the number of available element for consuming.
190    *
191    * Only safely called on the consumer thread.
192    *
193    * @return The number of available elements for reading.
194    */
available_read()195   int available_read() const
196   {
197 #ifndef NDEBUG
198     assert_correct_thread(consumer_id);
199 #endif
200     return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed),
201                                    write_index_.load(std::memory_order::memory_order_relaxed));
202   }
203   /**
204    * Get the number of available elements for consuming.
205    *
206    * Only safely called on the producer thread.
207    *
208    * @return The number of empty slots in the buffer, available for writing.
209    */
available_write()210   int available_write() const
211   {
212 #ifndef NDEBUG
213     assert_correct_thread(producer_id);
214 #endif
215     return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed),
216                                     write_index_.load(std::memory_order::memory_order_relaxed));
217   }
218   /**
219    * Get the total capacity, for this ring buffer.
220    *
221    * Can be called safely on any thread.
222    *
223    * @return The maximum capacity of this ring buffer.
224    */
capacity()225   int capacity() const
226   {
227     return storage_capacity() - 1;
228   }
229   /**
230    * Reset the consumer and producer thread identifier, in case the thread are
231    * being changed. This has to be externally synchronized. This is no-op when
232    * asserts are disabled.
233    */
reset_thread_ids()234   void reset_thread_ids()
235   {
236 #ifndef NDEBUG
237     consumer_id = producer_id = std::thread::id();
238 #endif
239   }
240 private:
241   /** Return true if the ring buffer is empty.
242    *
243    * @param read_index the read index to consider
244    * @param write_index the write index to consider
245    * @return true if the ring buffer is empty, false otherwise.
246    **/
empty_internal(int read_index,int write_index)247   bool empty_internal(int read_index,
248                       int write_index) const
249   {
250     return write_index == read_index;
251   }
252   /** Return true if the ring buffer is full.
253    *
254    * This happens if the write index is exactly one element behind the read
255    * index.
256    *
257    * @param read_index the read index to consider
258    * @param write_index the write index to consider
259    * @return true if the ring buffer is full, false otherwise.
260    **/
full_internal(int read_index,int write_index)261   bool full_internal(int read_index,
262                      int write_index) const
263   {
264     return (write_index + 1) % storage_capacity() == read_index;
265   }
266   /**
267    * Return the size of the storage. It is one more than the number of elements
268    * that can be stored in the buffer.
269    *
270    * @return the number of elements that can be stored in the buffer.
271    */
storage_capacity()272   int storage_capacity() const
273   {
274     return capacity_;
275   }
276   /**
277    * Returns the number of elements available for reading.
278    *
279    * @return the number of available elements for reading.
280    */
281   int
available_read_internal(int read_index,int write_index)282   available_read_internal(int read_index,
283                           int write_index) const
284   {
285     if (write_index >= read_index) {
286       return write_index - read_index;
287     } else {
288       return write_index + storage_capacity() - read_index;
289     }
290   }
291   /**
292    * Returns the number of empty elements, available for writing.
293    *
294    * @return the number of elements that can be written into the array.
295    */
296   int
available_write_internal(int read_index,int write_index)297   available_write_internal(int read_index,
298                            int write_index) const
299   {
300     /* We substract one element here to always keep at least one sample
301      * free in the buffer, to distinguish between full and empty array. */
302     int rv = read_index - write_index - 1;
303     if (write_index >= read_index) {
304       rv += storage_capacity();
305     }
306     return rv;
307   }
308   /**
309    * Increments an index, wrapping it around the storage.
310    *
311    * @param index a reference to the index to increment.
312    * @param increment the number by which `index` is incremented.
313    * @return the new index.
314    */
315   int
increment_index(int index,int increment)316   increment_index(int index, int increment) const
317   {
318     assert(increment >= 0);
319     return (index + increment) % storage_capacity();
320   }
321   /**
322    * @brief This allows checking that enqueue (resp. dequeue) are always called
323    * by the right thread.
324    *
325    * @param id the id of the thread that has called the calling method first.
326    */
327 #ifndef NDEBUG
assert_correct_thread(std::thread::id & id)328   static void assert_correct_thread(std::thread::id& id)
329   {
330     if (id == std::thread::id()) {
331       id = std::this_thread::get_id();
332       return;
333     }
334     assert(id == std::this_thread::get_id());
335   }
336 #endif
337   /** Index at which the oldest element is at, in samples. */
338   std::atomic<int> read_index_;
339   /** Index at which to write new elements. `write_index` is always at
340    * least one element ahead of `read_index_`. */
341   std::atomic<int> write_index_;
342   /** Maximum number of elements that can be stored in the ring buffer. */
343   const int capacity_;
344   /** Data storage */
345   std::unique_ptr<T[]> data_;
346 #ifndef NDEBUG
347   /** The id of the only thread that is allowed to read from the queue. */
348   mutable std::thread::id consumer_id;
349   /** The id of the only thread that is allowed to write from the queue. */
350   mutable std::thread::id producer_id;
351 #endif
352 };
353 
354 /**
355  * Adapter for `ring_buffer_base` that exposes an interface in frames.
356  */
357 template <typename T>
358 class audio_ring_buffer_base
359 {
360 public:
361   /**
362    * @brief Constructor.
363    *
364    * @param channel_count       Number of channels.
365    * @param capacity_in_frames  The capacity in frames.
366    */
audio_ring_buffer_base(int channel_count,int capacity_in_frames)367   audio_ring_buffer_base(int channel_count, int capacity_in_frames)
368     : channel_count(channel_count)
369     , ring_buffer(frames_to_samples(capacity_in_frames))
370   {
371     assert(channel_count > 0);
372   }
373   /**
374    * @brief Enqueue silence.
375    *
376    * Only safely called on the producer thread.
377    *
378    * @param frame_count The number of frames of silence to enqueue.
379    * @return  The number of frames of silence actually written to the queue.
380    */
enqueue_default(int frame_count)381   int enqueue_default(int frame_count)
382   {
383     return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
384   }
385   /**
386    * @brief Enqueue `frames_count` frames of audio.
387    *
388    * Only safely called from the producer thread.
389    *
390    * @param [in] frames If non-null, the frames to enqueue.
391    *                    Otherwise, silent frames are enqueued.
392    * @param frame_count The number of frames to enqueue.
393    *
394    * @return The number of frames enqueued
395    */
396 
enqueue(T * frames,int frame_count)397   int enqueue(T * frames, int frame_count)
398   {
399     return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
400   }
401 
402   /**
403    * @brief Removes `frame_count` frames from the buffer, and
404    *        write them to `frames` if it is non-null.
405    *
406    * Only safely called on the consumer thread.
407    *
408    * @param frames      If non-null, the frames are copied to `frames`.
409    *                    Otherwise, they are dropped.
410    * @param frame_count The number of frames to remove.
411    *
412    * @return  The number of frames actually dequeud.
413    */
dequeue(T * frames,int frame_count)414   int dequeue(T * frames, int frame_count)
415   {
416     return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
417   }
418   /**
419    * Get the number of available frames of audio for consuming.
420    *
421    * Only safely called on the consumer thread.
422    *
423    * @return The number of available frames of audio for reading.
424    */
available_read()425   int available_read() const
426   {
427     return samples_to_frames(ring_buffer.available_read());
428   }
429   /**
430    * Get the number of available frames of audio for consuming.
431    *
432    * Only safely called on the producer thread.
433    *
434    * @return The number of empty slots in the buffer, available for writing.
435    */
available_write()436   int available_write() const
437   {
438     return samples_to_frames(ring_buffer.available_write());
439   }
440   /**
441    * Get the total capacity, for this ring buffer.
442    *
443    * Can be called safely on any thread.
444    *
445    * @return The maximum capacity of this ring buffer.
446    */
capacity()447   int capacity() const
448   {
449     return samples_to_frames(ring_buffer.capacity());
450   }
451 private:
452   /**
453    * @brief Frames to samples conversion.
454    *
455    * @param frames The number of frames.
456    *
457    * @return  A number of samples.
458    */
frames_to_samples(int frames)459   int frames_to_samples(int frames) const
460   {
461     return frames * channel_count;
462   }
463   /**
464    * @brief Samples to frames conversion.
465    *
466    * @param samples The number of samples.
467    *
468    * @return  A number of frames.
469    */
samples_to_frames(int samples)470   int samples_to_frames(int samples) const
471   {
472     return samples / channel_count;
473   }
474   /** Number of channels of audio that will stream through this ring buffer. */
475   int channel_count;
476   /** The underlying ring buffer that is used to store the data. */
477   ring_buffer_base<T> ring_buffer;
478 };
479 
480 /**
481  * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
482  * from two threads, one producer, one consumer (that never change role),
483  * without explicit synchronization.
484  */
485 template<typename T>
486 using lock_free_queue = ring_buffer_base<T>;
487 /**
488  * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
489  * from two threads, one producer, one consumer (that never change role),
490  * without explicit synchronization.
491  */
492 template<typename T>
493 using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
494 
495 #endif // CUBEB_RING_BUFFER_H
496