1 //! Queue that plays sounds one after the other.
2
3 use std::sync::atomic::{AtomicBool, Ordering};
4 use std::sync::mpsc::{Receiver, Sender};
5 use std::sync::{mpsc, Arc, Mutex};
6 use std::time::Duration;
7
8 use crate::source::{Empty, Source, Zero};
9 use crate::Sample;
10
11 /// Builds a new queue. It consists of an input and an output.
12 ///
13 /// The input can be used to add sounds to the end of the queue, while the output implements
14 /// `Source` and plays the sounds.
15 ///
16 /// The parameter indicates how the queue should behave if the queue becomes empty:
17 ///
18 /// - If you pass `true`, then the queue is infinite and will play a silence instead until you add
19 /// a new sound.
20 /// - If you pass `false`, then the queue will report that it has finished playing.
21 ///
queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>) where S: Sample + Send + 'static,22 pub fn queue<S>(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput<S>>, SourcesQueueOutput<S>)
23 where
24 S: Sample + Send + 'static,
25 {
26 let input = Arc::new(SourcesQueueInput {
27 next_sounds: Mutex::new(Vec::new()),
28 keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
29 });
30
31 let output = SourcesQueueOutput {
32 current: Box::new(Empty::<S>::new()) as Box<_>,
33 signal_after_end: None,
34 input: input.clone(),
35 };
36
37 (input, output)
38 }
39
40 // TODO: consider reimplementing this with `from_factory`
41
42 /// The input of the queue.
43 pub struct SourcesQueueInput<S> {
44 next_sounds: Mutex<Vec<(Box<dyn Source<Item = S> + Send>, Option<Sender<()>>)>>,
45
46 // See constructor.
47 keep_alive_if_empty: AtomicBool,
48 }
49
50 impl<S> SourcesQueueInput<S>
51 where
52 S: Sample + Send + 'static,
53 {
54 /// Adds a new source to the end of the queue.
55 #[inline]
append<T>(&self, source: T) where T: Source<Item = S> + Send + 'static,56 pub fn append<T>(&self, source: T)
57 where
58 T: Source<Item = S> + Send + 'static,
59 {
60 self.next_sounds
61 .lock()
62 .unwrap()
63 .push((Box::new(source) as Box<_>, None));
64 }
65
66 /// Adds a new source to the end of the queue.
67 ///
68 /// The `Receiver` will be signalled when the sound has finished playing.
69 #[inline]
append_with_signal<T>(&self, source: T) -> Receiver<()> where T: Source<Item = S> + Send + 'static,70 pub fn append_with_signal<T>(&self, source: T) -> Receiver<()>
71 where
72 T: Source<Item = S> + Send + 'static,
73 {
74 let (tx, rx) = mpsc::channel();
75 self.next_sounds
76 .lock()
77 .unwrap()
78 .push((Box::new(source) as Box<_>, Some(tx)));
79 rx
80 }
81
82 /// Sets whether the queue stays alive if there's no more sound to play.
83 ///
84 /// See also the constructor.
set_keep_alive_if_empty(&self, keep_alive_if_empty: bool)85 pub fn set_keep_alive_if_empty(&self, keep_alive_if_empty: bool) {
86 self.keep_alive_if_empty
87 .store(keep_alive_if_empty, Ordering::Release);
88 }
89 }
90
91 /// The output of the queue. Implements `Source`.
92 pub struct SourcesQueueOutput<S> {
93 // The current iterator that produces samples.
94 current: Box<dyn Source<Item = S> + Send>,
95
96 // Signal this sender before picking from `next`.
97 signal_after_end: Option<Sender<()>>,
98
99 // The next sounds.
100 input: Arc<SourcesQueueInput<S>>,
101 }
102
103 impl<S> Source for SourcesQueueOutput<S>
104 where
105 S: Sample + Send + 'static,
106 {
107 #[inline]
current_frame_len(&self) -> Option<usize>108 fn current_frame_len(&self) -> Option<usize> {
109 // This function is non-trivial because the boundary between two sounds in the queue should
110 // be a frame boundary as well.
111 //
112 // The current sound is free to return `None` for `current_frame_len()`, in which case
113 // we *should* return the number of samples remaining the current sound.
114 // This can be estimated with `size_hint()`.
115 //
116 // If the `size_hint` is `None` as well, we are in the worst case scenario. To handle this
117 // situation we force a frame to have a maximum number of samples indicate by this
118 // constant.
119 const THRESHOLD: usize = 512;
120
121 // Try the current `current_frame_len`.
122 if let Some(val) = self.current.current_frame_len() {
123 if val != 0 {
124 return Some(val);
125 }
126 }
127
128 // Try the size hint.
129 let (lower_bound, _) = self.current.size_hint();
130 // The iterator default implementation just returns 0.
131 // That's a problematic value, so skip it.
132 if lower_bound > 0 {
133 return Some(lower_bound);
134 }
135
136 // Otherwise we use the constant value.
137 Some(THRESHOLD)
138 }
139
140 #[inline]
channels(&self) -> u16141 fn channels(&self) -> u16 {
142 self.current.channels()
143 }
144
145 #[inline]
sample_rate(&self) -> u32146 fn sample_rate(&self) -> u32 {
147 self.current.sample_rate()
148 }
149
150 #[inline]
total_duration(&self) -> Option<Duration>151 fn total_duration(&self) -> Option<Duration> {
152 None
153 }
154 }
155
156 impl<S> Iterator for SourcesQueueOutput<S>
157 where
158 S: Sample + Send + 'static,
159 {
160 type Item = S;
161
162 #[inline]
next(&mut self) -> Option<S>163 fn next(&mut self) -> Option<S> {
164 loop {
165 // Basic situation that will happen most of the time.
166 if let Some(sample) = self.current.next() {
167 return Some(sample);
168 }
169
170 // Since `self.current` has finished, we need to pick the next sound.
171 // In order to avoid inlining this expensive operation, the code is in another function.
172 if self.go_next().is_err() {
173 return None;
174 }
175 }
176 }
177
178 #[inline]
size_hint(&self) -> (usize, Option<usize>)179 fn size_hint(&self) -> (usize, Option<usize>) {
180 (self.current.size_hint().0, None)
181 }
182 }
183
184 impl<S> SourcesQueueOutput<S>
185 where
186 S: Sample + Send + 'static,
187 {
188 // Called when `current` is empty and we must jump to the next element.
189 // Returns `Ok` if the sound should continue playing, or an error if it should stop.
190 //
191 // This method is separate so that it is not inlined.
go_next(&mut self) -> Result<(), ()>192 fn go_next(&mut self) -> Result<(), ()> {
193 if let Some(signal_after_end) = self.signal_after_end.take() {
194 let _ = signal_after_end.send(());
195 }
196
197 let (next, signal_after_end) = {
198 let mut next = self.input.next_sounds.lock().unwrap();
199
200 if next.len() == 0 {
201 if self.input.keep_alive_if_empty.load(Ordering::Acquire) {
202 // Play a short silence in order to avoid spinlocking.
203 let silence = Zero::<S>::new(1, 44100); // TODO: meh
204 (
205 Box::new(silence.take_duration(Duration::from_millis(10))) as Box<_>,
206 None,
207 )
208 } else {
209 return Err(());
210 }
211 } else {
212 next.remove(0)
213 }
214 };
215
216 self.current = next;
217 self.signal_after_end = signal_after_end;
218 Ok(())
219 }
220 }
221
222 #[cfg(test)]
223 mod tests {
224 use crate::buffer::SamplesBuffer;
225 use crate::queue;
226 use crate::source::Source;
227
228 #[test]
229 #[ignore] // FIXME: samples rate and channel not updated immediately after transition
basic()230 fn basic() {
231 let (tx, mut rx) = queue::queue(false);
232
233 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
234 tx.append(SamplesBuffer::new(2, 96000, vec![5i16, 5, 5, 5]));
235
236 assert_eq!(rx.channels(), 1);
237 assert_eq!(rx.sample_rate(), 48000);
238 assert_eq!(rx.next(), Some(10));
239 assert_eq!(rx.next(), Some(-10));
240 assert_eq!(rx.next(), Some(10));
241 assert_eq!(rx.next(), Some(-10));
242 assert_eq!(rx.channels(), 2);
243 assert_eq!(rx.sample_rate(), 96000);
244 assert_eq!(rx.next(), Some(5));
245 assert_eq!(rx.next(), Some(5));
246 assert_eq!(rx.next(), Some(5));
247 assert_eq!(rx.next(), Some(5));
248 assert_eq!(rx.next(), None);
249 }
250
251 #[test]
immediate_end()252 fn immediate_end() {
253 let (_, mut rx) = queue::queue::<i16>(false);
254 assert_eq!(rx.next(), None);
255 }
256
257 #[test]
keep_alive()258 fn keep_alive() {
259 let (tx, mut rx) = queue::queue(true);
260 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
261
262 assert_eq!(rx.next(), Some(10));
263 assert_eq!(rx.next(), Some(-10));
264 assert_eq!(rx.next(), Some(10));
265 assert_eq!(rx.next(), Some(-10));
266
267 for _ in 0..100000 {
268 assert_eq!(rx.next(), Some(0));
269 }
270 }
271
272 #[test]
273 #[ignore] // TODO: not yet implemented
no_delay_when_added()274 fn no_delay_when_added() {
275 let (tx, mut rx) = queue::queue(true);
276
277 for _ in 0..500 {
278 assert_eq!(rx.next(), Some(0));
279 }
280
281 tx.append(SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10]));
282 assert_eq!(rx.next(), Some(10));
283 assert_eq!(rx.next(), Some(-10));
284 assert_eq!(rx.next(), Some(10));
285 assert_eq!(rx.next(), Some(-10));
286 }
287 }
288