1 //! Queue that plays sounds one after the other.
2 
3 use std::sync::atomic::{AtomicBool, Ordering};
4 use std::sync::mpsc::{Receiver, Sender};
5 use std::sync::{mpsc, Arc, Mutex};
6 use std::time::Duration;
7 
8 use crate::source::{Empty, Source, Zero};
9 use crate::Sample;
10 
11 /// Builds a new queue. It consists of an input and an output.
12 ///
13 /// The input can be used to add sounds to the end of the queue, while the output implements
14 /// `Source` and plays the sounds.
15 ///
16 /// The parameter indicates how the queue should behave if the queue becomes empty:
17 ///
18 /// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
19 ///   a new sound.
20 /// - If you pass `false`, then the queue will report that it has finished playing.
21 ///
queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>) where S: Sample + Send + 'static,22 pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
23 where
24     S: Sample + Send + 'static,
25 {
26     let input = Arc::new(SourcesQueueInput {
27         next_sounds: Mutex::new(Vec::new()),
28         keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
29     });
30 
31     let output = SourcesQueueOutput {
32         current: Box::new(Empty::<S>::new()) as Box<_>,
33         signal_after_end: None,
34         input: input.clone(),
35     };
36 
37     (input, output)
38 }
39 
40 // TODO: consider reimplementing this with `from_factory`
41 
42 /// The input of the queue.
43 pub struct SourcesQueueInput<S> {
44     next_sounds: Mutex<Vec<(Box<dyn Source<Item = S> + Send>, Option<Sender<()>>)>>,
45 
46     // See constructor.
47     keep_alive_if_empty: AtomicBool,
48 }
49 
50 impl<S> SourcesQueueInput<S>
51 where
52     S: Sample + Send + 'static,
53 {
54     /// Adds a new source to the end of the queue.
55     #[inline]
append<T>(&self, source: T) where T: Source<Item = S> + Send + 'static,56     pub fn append<T>(&self, source: T)
57     where
58         T: Source<Item = S> + Send + 'static,
59     {
60         self.next_sounds
61             .lock()
62             .unwrap()
63             .push((Box::new(source) as Box<_>, None));
64     }
65 
66     /// Adds a new source to the end of the queue.
67     ///
68     /// The `Receiver` will be signalled when the sound has finished playing.
69     #[inline]
append_with_signal<T>(&self, source: T) -> Receiver<()> where T: Source<Item = S> + Send + 'static,70     pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
71     where
72         T: Source<Item = S> + Send + 'static,
73     {
74         let (tx, rx) = mpsc::channel();
75         self.next_sounds
76             .lock()
77             .unwrap()
78             .push((Box::new(source) as Box<_>, Some(tx)));
79         rx
80     }
81 
82     /// Sets whether the queue stays alive if there's no more sound to play.
83     ///
84     /// See also the constructor.
set_keep_alive_if_empty(&self, keep_alive_if_empty: bool)85     pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
86         self.keep_alive_if_empty
87             .store(keep_alive_if_empty, Ordering::Release);
88     }
89 }
90 
91 /// The output of the queue. Implements `Source`.
92 pub struct SourcesQueueOutput<S> {
93     // The current iterator that produces samples.
94     current: Box<dyn Source<Item = S> + Send>,
95 
96     // Signal this sender before picking from `next`.
97     signal_after_end: Option<Sender<()>>,
98 
99     // The next sounds.
100     input: Arc<SourcesQueueInput<S>>,
101 }
102 
103 impl<S> Source for SourcesQueueOutput<S>
104 where
105     S: Sample + Send + 'static,
106 {
107     #[inline]
current_frame_len(&self) -> Option<usize>108     fn current_frame_len(&self) -> Option<usize> {
109         // This function is non-trivial because the boundary between two sounds in the queue should
110         // be a frame boundary as well.
111         //
112         // The current sound is free to return `None` for `current_frame_len()`, in which case
113         // we *should* return the number of samples remaining the current sound.
114         // This can be estimated with `size_hint()`.
115         //
116         // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
117         // situation we force a frame to have a maximum number of samples indicate by this
118         // constant.
119         const THRESHOLD: usize = 512;
120 
121         // Try the current `current_frame_len`.
122         if let Some(val) = self.current.current_frame_len() {
123             if val != 0 {
124                 return Some(val);
125             }
126         }
127 
128         // Try the size hint.
129         let (lower_bound, _) = self.current.size_hint();
130         // The iterator default implementation just returns 0.
131         // That's a problematic value, so skip it.
132         if lower_bound > 0 {
133             return Some(lower_bound);
134         }
135 
136         // Otherwise we use the constant value.
137         Some(THRESHOLD)
138     }
139 
140     #[inline]
channels(&self) -> u16141     fn channels(&self) -> u16 {
142         self.current.channels()
143     }
144 
145     #[inline]
sample_rate(&self) -> u32146     fn sample_rate(&self) -> u32 {
147         self.current.sample_rate()
148     }
149 
150     #[inline]
total_duration(&self) -> Option<Duration>151     fn total_duration(&self) -> Option<Duration> {
152         None
153     }
154 }
155 
156 impl<S> Iterator for SourcesQueueOutput<S>
157 where
158     S: Sample + Send + 'static,
159 {
160     type Item = S;
161 
162     #[inline]
next(&mut self) -> Option<S>163     fn next(&mut self) -> Option<S> {
164         loop {
165             // Basic situation that will happen most of the time.
166             if let Some(sample) = self.current.next() {
167                 return Some(sample);
168             }
169 
170             // Since `self.current` has finished, we need to pick the next sound.
171             // In order to avoid inlining this expensive operation, the code is in another function.
172             if self.go_next().is_err() {
173                 return None;
174             }
175         }
176     }
177 
178     #[inline]
size_hint(&self) -> (usize, Option<usize>)179     fn size_hint(&self) -> (usize, Option<usize>) {
180         (self.current.size_hint().0, None)
181     }
182 }
183 
184 impl<S> SourcesQueueOutput<S>
185 where
186     S: Sample + Send + 'static,
187 {
188     // Called when `current` is empty and we must jump to the next element.
189     // Returns `Ok` if the sound should continue playing, or an error if it should stop.
190     //
191     // This method is separate so that it is not inlined.
go_next(&mut self) -> Result<(), ()>192     fn go_next(&mut self) -> Result<(), ()> {
193         if let Some(signal_after_end) = self.signal_after_end.take() {
194             let _ = signal_after_end.send(());
195         }
196 
197         let (next, signal_after_end) = {
198             let mut next = self.input.next_sounds.lock().unwrap();
199 
200             if next.len() == 0 {
201                 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
202                     // Play a short silence in order to avoid spinlocking.
203                     let silence = Zero::<S>::new(1, 44100); // TODO: meh
204                     (
205                         Box::new(silence.take_duration(Duration::from_millis(10))) as Box<_>,
206                         None,
207                     )
208                 } else {
209                     return Err(());
210                 }
211             } else {
212                 next.remove(0)
213             }
214         };
215 
216         self.current = next;
217         self.signal_after_end = signal_after_end;
218         Ok(())
219     }
220 }
221 
222 #[cfg(test)]
223 mod tests {
224     use crate::buffer::SamplesBuffer;
225     use crate::queue;
226     use crate::source::Source;
227 
228     #[test]
229     #[ignore] // FIXME: samples rate and channel not updated immediately after transition
basic()230     fn basic() {
231         let (tx, mut rx) = queue::queue(false);
232 
233         tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
234         tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
235 
236         assert_eq!(rx.channels(), 1);
237         assert_eq!(rx.sample_rate(), 48000);
238         assert_eq!(rx.next(), Some(10));
239         assert_eq!(rx.next(), Some(-10));
240         assert_eq!(rx.next(), Some(10));
241         assert_eq!(rx.next(), Some(-10));
242         assert_eq!(rx.channels(), 2);
243         assert_eq!(rx.sample_rate(), 96000);
244         assert_eq!(rx.next(), Some(5));
245         assert_eq!(rx.next(), Some(5));
246         assert_eq!(rx.next(), Some(5));
247         assert_eq!(rx.next(), Some(5));
248         assert_eq!(rx.next(), None);
249     }
250 
251     #[test]
immediate_end()252     fn immediate_end() {
253         let (_, mut rx) = queue::queue::<i16>(false);
254         assert_eq!(rx.next(), None);
255     }
256 
257     #[test]
keep_alive()258     fn keep_alive() {
259         let (tx, mut rx) = queue::queue(true);
260         tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
261 
262         assert_eq!(rx.next(), Some(10));
263         assert_eq!(rx.next(), Some(-10));
264         assert_eq!(rx.next(), Some(10));
265         assert_eq!(rx.next(), Some(-10));
266 
267         for _ in 0..100000 {
268             assert_eq!(rx.next(), Some(0));
269         }
270     }
271 
272     #[test]
273     #[ignore] // TODO: not yet implemented
no_delay_when_added()274     fn no_delay_when_added() {
275         let (tx, mut rx) = queue::queue(true);
276 
277         for _ in 0..500 {
278             assert_eq!(rx.next(), Some(0));
279         }
280 
281         tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
282         assert_eq!(rx.next(), Some(10));
283         assert_eq!(rx.next(), Some(-10));
284         assert_eq!(rx.next(), Some(10));
285         assert_eq!(rx.next(), Some(-10));
286     }
287 }
288