1 extern crate alsa;
2 extern crate libc;
3 extern crate parking_lot;
4 
5 use self::alsa::poll::Descriptors;
6 use self::parking_lot::Mutex;
7 use crate::{
8     BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
9     DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
10     PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
11     SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
12     SupportedStreamConfigsError,
13 };
14 use std::cmp;
15 use std::convert::TryInto;
16 use std::sync::Arc;
17 use std::thread::{self, JoinHandle};
18 use std::vec::IntoIter as VecIntoIter;
19 use traits::{DeviceTrait, HostTrait, StreamTrait};
20 
21 pub use self::enumerate::{default_input_device, default_output_device, Devices};
22 
23 pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24 pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
25 
26 mod enumerate;
27 
28 /// The default linux, dragonfly and freebsd host type.
29 #[derive(Debug)]
30 pub struct Host;
31 
32 impl Host {
new() -> Result<Self, crate::HostUnavailable>33     pub fn new() -> Result<Self, crate::HostUnavailable> {
34         Ok(Host)
35     }
36 }
37 
38 impl HostTrait for Host {
39     type Devices = Devices;
40     type Device = Device;
41 
is_available() -> bool42     fn is_available() -> bool {
43         // Assume ALSA is always available on linux/dragonfly/freebsd.
44         true
45     }
46 
devices(&self) -> Result<Self::Devices, DevicesError>47     fn devices(&self) -> Result<Self::Devices, DevicesError> {
48         Devices::new()
49     }
50 
default_input_device(&self) -> Option<Self::Device>51     fn default_input_device(&self) -> Option<Self::Device> {
52         default_input_device()
53     }
54 
default_output_device(&self) -> Option<Self::Device>55     fn default_output_device(&self) -> Option<Self::Device> {
56         default_output_device()
57     }
58 }
59 
60 impl DeviceTrait for Device {
61     type SupportedInputConfigs = SupportedInputConfigs;
62     type SupportedOutputConfigs = SupportedOutputConfigs;
63     type Stream = Stream;
64 
name(&self) -> Result<String, DeviceNameError>65     fn name(&self) -> Result<String, DeviceNameError> {
66         Device::name(self)
67     }
68 
supported_input_configs( &self, ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError>69     fn supported_input_configs(
70         &self,
71     ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
72         Device::supported_input_configs(self)
73     }
74 
supported_output_configs( &self, ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError>75     fn supported_output_configs(
76         &self,
77     ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
78         Device::supported_output_configs(self)
79     }
80 
default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>81     fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
82         Device::default_input_config(self)
83     }
84 
default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>85     fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
86         Device::default_output_config(self)
87     }
88 
build_input_stream_raw<D, E>( &self, conf: &StreamConfig, sample_format: SampleFormat, data_callback: D, error_callback: E, ) -> Result<Self::Stream, BuildStreamError> where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,89     fn build_input_stream_raw<D, E>(
90         &self,
91         conf: &StreamConfig,
92         sample_format: SampleFormat,
93         data_callback: D,
94         error_callback: E,
95     ) -> Result<Self::Stream, BuildStreamError>
96     where
97         D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
98         E: FnMut(StreamError) + Send + 'static,
99     {
100         let stream_inner =
101             self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
102         let stream = Stream::new_input(Arc::new(stream_inner), data_callback, error_callback);
103         Ok(stream)
104     }
105 
build_output_stream_raw<D, E>( &self, conf: &StreamConfig, sample_format: SampleFormat, data_callback: D, error_callback: E, ) -> Result<Self::Stream, BuildStreamError> where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,106     fn build_output_stream_raw<D, E>(
107         &self,
108         conf: &StreamConfig,
109         sample_format: SampleFormat,
110         data_callback: D,
111         error_callback: E,
112     ) -> Result<Self::Stream, BuildStreamError>
113     where
114         D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
115         E: FnMut(StreamError) + Send + 'static,
116     {
117         let stream_inner =
118             self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
119         let stream = Stream::new_output(Arc::new(stream_inner), data_callback, error_callback);
120         Ok(stream)
121     }
122 }
123 
124 struct TriggerSender(libc::c_int);
125 
126 struct TriggerReceiver(libc::c_int);
127 
128 impl TriggerSender {
wakeup(&self)129     fn wakeup(&self) {
130         let buf = 1u64;
131         let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
132         assert_eq!(ret, 8);
133     }
134 }
135 
136 impl TriggerReceiver {
clear_pipe(&self)137     fn clear_pipe(&self) {
138         let mut out = 0u64;
139         let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
140         assert_eq!(ret, 8);
141     }
142 }
143 
trigger() -> (TriggerSender, TriggerReceiver)144 fn trigger() -> (TriggerSender, TriggerReceiver) {
145     let mut fds = [0, 0];
146     match unsafe { libc::pipe(fds.as_mut_ptr()) } {
147         0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
148         _ => panic!("Could not create pipe"),
149     }
150 }
151 
152 impl Drop for TriggerSender {
drop(&mut self)153     fn drop(&mut self) {
154         unsafe {
155             libc::close(self.0);
156         }
157     }
158 }
159 
160 impl Drop for TriggerReceiver {
drop(&mut self)161     fn drop(&mut self) {
162         unsafe {
163             libc::close(self.0);
164         }
165     }
166 }
167 
168 #[derive(Default)]
169 struct DeviceHandles {
170     playback: Option<alsa::PCM>,
171     capture: Option<alsa::PCM>,
172 }
173 
174 impl DeviceHandles {
175     /// Create `DeviceHandles` for `name` and try to open a handle for both
176     /// directions. Returns `Ok` if either direction is opened successfully.
open(name: &str) -> Result<Self, alsa::Error>177     fn open(name: &str) -> Result<Self, alsa::Error> {
178         let mut handles = Self::default();
179         let playback_err = handles.try_open(name, alsa::Direction::Playback).err();
180         let capture_err = handles.try_open(name, alsa::Direction::Capture).err();
181         if let Some(err) = capture_err.and(playback_err) {
182             Err(err)
183         } else {
184             Ok(handles)
185         }
186     }
187 
188     /// Get a mutable reference to the `Option` for a specific `stream_type`.
189     /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in
190     /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained
191     /// `Option` is guaranteed to be `Some(..)`.
try_open( &mut self, name: &str, stream_type: alsa::Direction, ) -> Result<&mut Option<alsa::PCM>, alsa::Error>192     fn try_open(
193         &mut self,
194         name: &str,
195         stream_type: alsa::Direction,
196     ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
197         let handle = match stream_type {
198             alsa::Direction::Playback => &mut self.playback,
199             alsa::Direction::Capture => &mut self.capture,
200         };
201 
202         if handle.is_none() {
203             *handle = Some(alsa::pcm::PCM::new(name, stream_type, true)?);
204         }
205 
206         Ok(handle)
207     }
208 
209     /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`.
210     /// If the handle is not yet opened, it will be opened and stored in `self`.
get_mut( &mut self, name: &str, stream_type: alsa::Direction, ) -> Result<&mut alsa::PCM, alsa::Error>211     fn get_mut(
212         &mut self,
213         name: &str,
214         stream_type: alsa::Direction,
215     ) -> Result<&mut alsa::PCM, alsa::Error> {
216         Ok(self.try_open(name, stream_type)?.as_mut().unwrap())
217     }
218 
219     /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`.
220     /// If the handle is not yet opened, it will be opened and returned.
take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error>221     fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
222         Ok(self.try_open(name, stream_type)?.take().unwrap())
223     }
224 }
225 
226 pub struct Device {
227     name: String,
228     handles: Mutex<DeviceHandles>,
229 }
230 
231 impl Device {
build_stream_inner( &self, conf: &StreamConfig, sample_format: SampleFormat, stream_type: alsa::Direction, ) -> Result<StreamInner, BuildStreamError>232     fn build_stream_inner(
233         &self,
234         conf: &StreamConfig,
235         sample_format: SampleFormat,
236         stream_type: alsa::Direction,
237     ) -> Result<StreamInner, BuildStreamError> {
238         let handle_result = self
239             .handles
240             .lock()
241             .take(&self.name, stream_type)
242             .map_err(|e| (e, e.errno()));
243 
244         let handle = match handle_result {
245             Err((_, Some(nix::errno::Errno::EBUSY))) => {
246                 return Err(BuildStreamError::DeviceNotAvailable)
247             }
248             Err((_, Some(nix::errno::Errno::EINVAL))) => {
249                 return Err(BuildStreamError::InvalidArgument)
250             }
251             Err((e, _)) => return Err(e.into()),
252             Ok(handle) => handle,
253         };
254         let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
255         let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
256 
257         handle.prepare()?;
258 
259         let num_descriptors = {
260             let num_descriptors = handle.count();
261             if num_descriptors == 0 {
262                 let description = "poll descriptor count for stream was 0".to_string();
263                 let err = BackendSpecificError { description };
264                 return Err(err.into());
265             }
266             num_descriptors
267         };
268 
269         // Check to see if we can retrieve valid timestamps from the device.
270         // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
271         let ts = handle.status()?.get_htstamp();
272         let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
273             (0, 0) => Some(std::time::Instant::now()),
274             _ => None,
275         };
276 
277         if let alsa::Direction::Capture = stream_type {
278             handle.start()?;
279         }
280 
281         let stream_inner = StreamInner {
282             channel: handle,
283             sample_format,
284             num_descriptors,
285             conf: conf.clone(),
286             period_len,
287             can_pause,
288             creation_instant,
289         };
290 
291         Ok(stream_inner)
292     }
293 
294     #[inline]
name(&self) -> Result<String, DeviceNameError>295     fn name(&self) -> Result<String, DeviceNameError> {
296         Ok(self.name.clone())
297     }
298 
supported_configs( &self, stream_t: alsa::Direction, ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError>299     fn supported_configs(
300         &self,
301         stream_t: alsa::Direction,
302     ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
303         let mut guard = self.handles.lock();
304         let handle_result = guard
305             .get_mut(&self.name, stream_t)
306             .map_err(|e| (e, e.errno()));
307 
308         let handle = match handle_result {
309             Err((_, Some(nix::errno::Errno::ENOENT)))
310             | Err((_, Some(nix::errno::Errno::EBUSY))) => {
311                 return Err(SupportedStreamConfigsError::DeviceNotAvailable)
312             }
313             Err((_, Some(nix::errno::Errno::EINVAL))) => {
314                 return Err(SupportedStreamConfigsError::InvalidArgument)
315             }
316             Err((e, _)) => return Err(e.into()),
317             Ok(handle) => handle,
318         };
319 
320         let hw_params = alsa::pcm::HwParams::any(handle)?;
321 
322         // TODO: check endianess
323         const FORMATS: [(SampleFormat, alsa::pcm::Format); 3] = [
324             //SND_PCM_FORMAT_S8,
325             //SND_PCM_FORMAT_U8,
326             (SampleFormat::I16, alsa::pcm::Format::S16LE),
327             //SND_PCM_FORMAT_S16_BE,
328             (SampleFormat::U16, alsa::pcm::Format::U16LE),
329             //SND_PCM_FORMAT_U16_BE,
330             //SND_PCM_FORMAT_S24_LE,
331             //SND_PCM_FORMAT_S24_BE,
332             //SND_PCM_FORMAT_U24_LE,
333             //SND_PCM_FORMAT_U24_BE,
334             //SND_PCM_FORMAT_S32_LE,
335             //SND_PCM_FORMAT_S32_BE,
336             //SND_PCM_FORMAT_U32_LE,
337             //SND_PCM_FORMAT_U32_BE,
338             (SampleFormat::F32, alsa::pcm::Format::FloatLE),
339             //SND_PCM_FORMAT_FLOAT_BE,
340             //SND_PCM_FORMAT_FLOAT64_LE,
341             //SND_PCM_FORMAT_FLOAT64_BE,
342             //SND_PCM_FORMAT_IEC958_SUBFRAME_LE,
343             //SND_PCM_FORMAT_IEC958_SUBFRAME_BE,
344             //SND_PCM_FORMAT_MU_LAW,
345             //SND_PCM_FORMAT_A_LAW,
346             //SND_PCM_FORMAT_IMA_ADPCM,
347             //SND_PCM_FORMAT_MPEG,
348             //SND_PCM_FORMAT_GSM,
349             //SND_PCM_FORMAT_SPECIAL,
350             //SND_PCM_FORMAT_S24_3LE,
351             //SND_PCM_FORMAT_S24_3BE,
352             //SND_PCM_FORMAT_U24_3LE,
353             //SND_PCM_FORMAT_U24_3BE,
354             //SND_PCM_FORMAT_S20_3LE,
355             //SND_PCM_FORMAT_S20_3BE,
356             //SND_PCM_FORMAT_U20_3LE,
357             //SND_PCM_FORMAT_U20_3BE,
358             //SND_PCM_FORMAT_S18_3LE,
359             //SND_PCM_FORMAT_S18_3BE,
360             //SND_PCM_FORMAT_U18_3LE,
361             //SND_PCM_FORMAT_U18_3BE,
362         ];
363 
364         let mut supported_formats = Vec::new();
365         for &(sample_format, alsa_format) in FORMATS.iter() {
366             if hw_params.test_format(alsa_format).is_ok() {
367                 supported_formats.push(sample_format);
368             }
369         }
370 
371         let min_rate = hw_params.get_rate_min()?;
372         let max_rate = hw_params.get_rate_max()?;
373 
374         let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
375             vec![(min_rate, max_rate)]
376         } else {
377             const RATES: [libc::c_uint; 13] = [
378                 5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
379                 192000,
380             ];
381 
382             let mut rates = Vec::new();
383             for &rate in RATES.iter() {
384                 if hw_params.test_rate(rate).is_ok() {
385                     rates.push((rate, rate));
386                 }
387             }
388 
389             if rates.is_empty() {
390                 vec![(min_rate, max_rate)]
391             } else {
392                 rates
393             }
394         };
395 
396         let min_channels = hw_params.get_channels_min()?;
397         let max_channels = hw_params.get_channels_max()?;
398 
399         let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned
400         let supported_channels = (min_channels..max_channels + 1)
401             .filter_map(|num| {
402                 if hw_params.test_channels(num).is_ok() {
403                     Some(num as ChannelCount)
404                 } else {
405                     None
406                 }
407             })
408             .collect::<Vec<_>>();
409 
410         let min_buffer_size = hw_params.get_buffer_size_min()?;
411         let max_buffer_size = hw_params.get_buffer_size_max()?;
412 
413         let buffer_size_range = SupportedBufferSize::Range {
414             min: min_buffer_size as u32,
415             max: max_buffer_size as u32,
416         };
417 
418         let mut output = Vec::with_capacity(
419             supported_formats.len() * supported_channels.len() * sample_rates.len(),
420         );
421         for &sample_format in supported_formats.iter() {
422             for &channels in supported_channels.iter() {
423                 for &(min_rate, max_rate) in sample_rates.iter() {
424                     output.push(SupportedStreamConfigRange {
425                         channels,
426                         min_sample_rate: SampleRate(min_rate as u32),
427                         max_sample_rate: SampleRate(max_rate as u32),
428                         buffer_size: buffer_size_range.clone(),
429                         sample_format,
430                     });
431                 }
432             }
433         }
434 
435         Ok(output.into_iter())
436     }
437 
supported_input_configs( &self, ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError>438     fn supported_input_configs(
439         &self,
440     ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
441         self.supported_configs(alsa::Direction::Capture)
442     }
443 
supported_output_configs( &self, ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError>444     fn supported_output_configs(
445         &self,
446     ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
447         self.supported_configs(alsa::Direction::Playback)
448     }
449 
450     // ALSA does not offer default stream formats, so instead we compare all supported formats by
451     // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest.
default_config( &self, stream_t: alsa::Direction, ) -> Result<SupportedStreamConfig, DefaultStreamConfigError>452     fn default_config(
453         &self,
454         stream_t: alsa::Direction,
455     ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
456         let mut formats: Vec<_> = {
457             match self.supported_configs(stream_t) {
458                 Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
459                     return Err(DefaultStreamConfigError::DeviceNotAvailable);
460                 }
461                 Err(SupportedStreamConfigsError::InvalidArgument) => {
462                     // this happens sometimes when querying for input and output capabilities, but
463                     // the device supports only one
464                     return Err(DefaultStreamConfigError::StreamTypeNotSupported);
465                 }
466                 Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
467                     return Err(err.into());
468                 }
469                 Ok(fmts) => fmts.collect(),
470             }
471         };
472 
473         formats.sort_by(|a, b| a.cmp_default_heuristics(b));
474 
475         match formats.into_iter().last() {
476             Some(f) => {
477                 let min_r = f.min_sample_rate;
478                 let max_r = f.max_sample_rate;
479                 let mut format = f.with_max_sample_rate();
480                 const HZ_44100: SampleRate = SampleRate(44_100);
481                 if min_r <= HZ_44100 && HZ_44100 <= max_r {
482                     format.sample_rate = HZ_44100;
483                 }
484                 Ok(format)
485             }
486             None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
487         }
488     }
489 
default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>490     fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
491         self.default_config(alsa::Direction::Capture)
492     }
493 
default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>494     fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
495         self.default_config(alsa::Direction::Playback)
496     }
497 }
498 
499 struct StreamInner {
500     // The ALSA channel.
501     channel: alsa::pcm::PCM,
502 
503     // When converting between file descriptors and `snd_pcm_t`, this is the number of
504     // file descriptors that this `snd_pcm_t` uses.
505     num_descriptors: usize,
506 
507     // Format of the samples.
508     sample_format: SampleFormat,
509 
510     // The configuration used to open this stream.
511     conf: StreamConfig,
512 
513     // Minimum number of samples to put in the buffer.
514     period_len: usize,
515 
516     #[allow(dead_code)]
517     // Whether or not the hardware supports pausing the stream.
518     // TODO: We need an API to expose this. See #197, #284.
519     can_pause: bool,
520 
521     // In the case that the device does not return valid timestamps via `get_htstamp`, this field
522     // will be `Some` and will contain an `Instant` representing the moment the stream was created.
523     //
524     // If this field is `Some`, then the stream will use the duration since this instant as a
525     // source for timestamps.
526     //
527     // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
528     // `get_htstamp` is used.
529     creation_instant: Option<std::time::Instant>,
530 }
531 
532 // Assume that the ALSA library is built with thread safe option.
533 unsafe impl Sync for StreamInner {}
534 
535 #[derive(Debug, Eq, PartialEq)]
536 enum StreamType {
537     Input,
538     Output,
539 }
540 
541 pub struct Stream {
542     /// The high-priority audio processing thread calling callbacks.
543     /// Option used for moving out in destructor.
544     thread: Option<JoinHandle<()>>,
545 
546     /// Handle to the underlying stream for playback controls.
547     inner: Arc<StreamInner>,
548 
549     /// Used to signal to stop processing.
550     trigger: TriggerSender,
551 }
552 
553 #[derive(Default)]
554 struct StreamWorkerContext {
555     descriptors: Vec<libc::pollfd>,
556     buffer: Vec<u8>,
557 }
558 
input_stream_worker( rx: TriggerReceiver, stream: &StreamInner, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), )559 fn input_stream_worker(
560     rx: TriggerReceiver,
561     stream: &StreamInner,
562     data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
563     error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
564 ) {
565     let mut ctxt = StreamWorkerContext::default();
566     loop {
567         let flow = report_error(
568             poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt),
569             error_callback,
570         )
571         .unwrap_or(PollDescriptorsFlow::Continue);
572 
573         match flow {
574             PollDescriptorsFlow::Continue => {
575                 continue;
576             }
577             PollDescriptorsFlow::XRun => {
578                 report_error(stream.channel.prepare(), error_callback);
579                 continue;
580             }
581             PollDescriptorsFlow::Return => return,
582             PollDescriptorsFlow::Ready {
583                 status,
584                 avail_frames: _,
585                 delay_frames,
586                 stream_type,
587             } => {
588                 assert_eq!(
589                     stream_type,
590                     StreamType::Input,
591                     "expected input stream, but polling descriptors indicated output",
592                 );
593                 let res = process_input(
594                     stream,
595                     &mut ctxt.buffer,
596                     status,
597                     delay_frames,
598                     data_callback,
599                 );
600                 report_error(res, error_callback);
601             }
602         }
603     }
604 }
605 
output_stream_worker( rx: TriggerReceiver, stream: &StreamInner, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), )606 fn output_stream_worker(
607     rx: TriggerReceiver,
608     stream: &StreamInner,
609     data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
610     error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
611 ) {
612     let mut ctxt = StreamWorkerContext::default();
613     loop {
614         let flow = report_error(
615             poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt),
616             error_callback,
617         )
618         .unwrap_or(PollDescriptorsFlow::Continue);
619 
620         match flow {
621             PollDescriptorsFlow::Continue => continue,
622             PollDescriptorsFlow::XRun => {
623                 report_error(stream.channel.prepare(), error_callback);
624                 continue;
625             }
626             PollDescriptorsFlow::Return => return,
627             PollDescriptorsFlow::Ready {
628                 status,
629                 avail_frames,
630                 delay_frames,
631                 stream_type,
632             } => {
633                 assert_eq!(
634                     stream_type,
635                     StreamType::Output,
636                     "expected output stream, but polling descriptors indicated input",
637                 );
638                 let res = process_output(
639                     stream,
640                     &mut ctxt.buffer,
641                     status,
642                     avail_frames,
643                     delay_frames,
644                     data_callback,
645                     error_callback,
646                 );
647                 report_error(res, error_callback);
648             }
649         }
650     }
651 }
652 
report_error<T, E>( result: Result<T, E>, error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), ) -> Option<T> where E: Into<StreamError>,653 fn report_error<T, E>(
654     result: Result<T, E>,
655     error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
656 ) -> Option<T>
657 where
658     E: Into<StreamError>,
659 {
660     match result {
661         Ok(val) => Some(val),
662         Err(err) => {
663             error_callback(err.into());
664             None
665         }
666     }
667 }
668 
669 enum PollDescriptorsFlow {
670     Continue,
671     Return,
672     Ready {
673         stream_type: StreamType,
674         status: alsa::pcm::Status,
675         avail_frames: usize,
676         delay_frames: usize,
677     },
678     XRun,
679 }
680 
681 // This block is shared between both input and output stream worker functions.
poll_descriptors_and_prepare_buffer( rx: &TriggerReceiver, stream: &StreamInner, ctxt: &mut StreamWorkerContext, ) -> Result<PollDescriptorsFlow, BackendSpecificError>682 fn poll_descriptors_and_prepare_buffer(
683     rx: &TriggerReceiver,
684     stream: &StreamInner,
685     ctxt: &mut StreamWorkerContext,
686 ) -> Result<PollDescriptorsFlow, BackendSpecificError> {
687     let StreamWorkerContext {
688         ref mut descriptors,
689         ref mut buffer,
690     } = *ctxt;
691 
692     descriptors.clear();
693 
694     // Add the self-pipe for signaling termination.
695     descriptors.push(libc::pollfd {
696         fd: rx.0,
697         events: libc::POLLIN,
698         revents: 0,
699     });
700 
701     // Add ALSA polling fds.
702     let len = descriptors.len();
703     descriptors.resize(
704         stream.num_descriptors + len,
705         libc::pollfd {
706             fd: 0,
707             events: 0,
708             revents: 0,
709         },
710     );
711     let filled = stream.channel.fill(&mut descriptors[len..])?;
712     debug_assert_eq!(filled, stream.num_descriptors);
713 
714     // Don't timeout, wait forever.
715     let res = alsa::poll::poll(descriptors, -1)?;
716     if res == 0 {
717         let description = String::from("`alsa::poll()` spuriously returned");
718         return Err(BackendSpecificError { description });
719     }
720 
721     if descriptors[0].revents != 0 {
722         // The stream has been requested to be destroyed.
723         rx.clear_pipe();
724         return Ok(PollDescriptorsFlow::Return);
725     }
726 
727     let stream_type = match stream.channel.revents(&descriptors[1..])? {
728         alsa::poll::Flags::OUT => StreamType::Output,
729         alsa::poll::Flags::IN => StreamType::Input,
730         _ => {
731             // Nothing to process, poll again
732             return Ok(PollDescriptorsFlow::Continue);
733         }
734     };
735 
736     let status = stream.channel.status()?;
737     let avail_frames = match stream.channel.avail() {
738         Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
739             return Ok(PollDescriptorsFlow::XRun)
740         }
741         res => res,
742     }? as usize;
743     let delay_frames = match status.get_delay() {
744         // Buffer underrun. TODO: Notify the user.
745         d if d < 0 => 0,
746         d => d as usize,
747     };
748     let available_samples = avail_frames * stream.conf.channels as usize;
749 
750     // Only go on if there is at least `stream.period_len` samples.
751     if available_samples < stream.period_len {
752         return Ok(PollDescriptorsFlow::Continue);
753     }
754 
755     // Prepare the data buffer.
756     let buffer_size = stream.sample_format.sample_size() * available_samples;
757     buffer.resize(buffer_size, 0u8);
758 
759     Ok(PollDescriptorsFlow::Ready {
760         stream_type,
761         status,
762         avail_frames,
763         delay_frames,
764     })
765 }
766 
767 // Read input data from ALSA and deliver it to the user.
process_input( stream: &StreamInner, buffer: &mut [u8], status: alsa::pcm::Status, delay_frames: usize, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), ) -> Result<(), BackendSpecificError>768 fn process_input(
769     stream: &StreamInner,
770     buffer: &mut [u8],
771     status: alsa::pcm::Status,
772     delay_frames: usize,
773     data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
774 ) -> Result<(), BackendSpecificError> {
775     stream.channel.io_bytes().readi(buffer)?;
776     let sample_format = stream.sample_format;
777     let data = buffer.as_mut_ptr() as *mut ();
778     let len = buffer.len() / sample_format.sample_size();
779     let data = unsafe { Data::from_parts(data, len, sample_format) };
780     let callback = stream_timestamp(&status, stream.creation_instant)?;
781     let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
782     let capture = callback
783         .sub(delay_duration)
784         .expect("`capture` is earlier than representation supported by `StreamInstant`");
785     let timestamp = crate::InputStreamTimestamp { callback, capture };
786     let info = crate::InputCallbackInfo { timestamp };
787     data_callback(&data, &info);
788 
789     Ok(())
790 }
791 
792 // Request data from the user's function and write it via ALSA.
793 //
794 // Returns `true`
process_output( stream: &StreamInner, buffer: &mut [u8], status: alsa::pcm::Status, available_frames: usize, delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut dyn FnMut(StreamError), ) -> Result<(), BackendSpecificError>795 fn process_output(
796     stream: &StreamInner,
797     buffer: &mut [u8],
798     status: alsa::pcm::Status,
799     available_frames: usize,
800     delay_frames: usize,
801     data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
802     error_callback: &mut dyn FnMut(StreamError),
803 ) -> Result<(), BackendSpecificError> {
804     {
805         // We're now sure that we're ready to write data.
806         let sample_format = stream.sample_format;
807         let data = buffer.as_mut_ptr() as *mut ();
808         let len = buffer.len() / sample_format.sample_size();
809         let mut data = unsafe { Data::from_parts(data, len, sample_format) };
810         let callback = stream_timestamp(&status, stream.creation_instant)?;
811         let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
812         let playback = callback
813             .add(delay_duration)
814             .expect("`playback` occurs beyond representation supported by `StreamInstant`");
815         let timestamp = crate::OutputStreamTimestamp { callback, playback };
816         let info = crate::OutputCallbackInfo { timestamp };
817         data_callback(&mut data, &info);
818     }
819     loop {
820         match stream.channel.io_bytes().writei(buffer) {
821             Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
822                 // buffer underrun
823                 // TODO: Notify the user of this.
824                 let _ = stream.channel.try_recover(err, false);
825             }
826             Err(err) => {
827                 error_callback(err.into());
828                 continue;
829             }
830             Ok(result) if result != available_frames => {
831                 let description = format!(
832                     "unexpected number of frames written: expected {}, \
833                      result {} (this should never happen)",
834                     available_frames, result,
835                 );
836                 error_callback(BackendSpecificError { description }.into());
837                 continue;
838             }
839             _ => {
840                 break;
841             }
842         }
843     }
844     Ok(())
845 }
846 
847 // Use the elapsed duration since the start of the stream.
848 //
849 // This ensures positive values that are compatible with our `StreamInstant` representation.
stream_timestamp( status: &alsa::pcm::Status, creation_instant: Option<std::time::Instant>, ) -> Result<crate::StreamInstant, BackendSpecificError>850 fn stream_timestamp(
851     status: &alsa::pcm::Status,
852     creation_instant: Option<std::time::Instant>,
853 ) -> Result<crate::StreamInstant, BackendSpecificError> {
854     match creation_instant {
855         None => {
856             let trigger_ts = status.get_trigger_htstamp();
857             let ts = status.get_htstamp();
858             let nanos = timespec_diff_nanos(ts, trigger_ts);
859             if nanos < 0 {
860                 panic!(
861                     "get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`",
862                     ts, trigger_ts
863                 );
864             }
865             Ok(crate::StreamInstant::from_nanos(nanos))
866         }
867         Some(creation) => {
868             let now = std::time::Instant::now();
869             let duration = now.duration_since(creation);
870             let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128)
871                 .expect("stream duration has exceeded `StreamInstant` representation");
872             Ok(instant)
873         }
874     }
875 }
876 
877 // Adapted from `timestamp2ns` here:
878 // https://fossies.org/linux/alsa-lib/test/audio_time.c
timespec_to_nanos(ts: libc::timespec) -> i64879 fn timespec_to_nanos(ts: libc::timespec) -> i64 {
880     ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
881 }
882 
883 // Adapted from `timediff` here:
884 // https://fossies.org/linux/alsa-lib/test/audio_time.c
timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64885 fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
886     timespec_to_nanos(a) - timespec_to_nanos(b)
887 }
888 
889 // Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration890 fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
891     let secsf = frames as f64 / rate.0 as f64;
892     let secs = secsf as u64;
893     let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
894     std::time::Duration::new(secs, nanos)
895 }
896 
897 impl Stream {
new_input<D, E>( inner: Arc<StreamInner>, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,898     fn new_input<D, E>(
899         inner: Arc<StreamInner>,
900         mut data_callback: D,
901         mut error_callback: E,
902     ) -> Stream
903     where
904         D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
905         E: FnMut(StreamError) + Send + 'static,
906     {
907         let (tx, rx) = trigger();
908         // Clone the handle for passing into worker thread.
909         let stream = inner.clone();
910         let thread = thread::Builder::new()
911             .name("cpal_alsa_in".to_owned())
912             .spawn(move || {
913                 input_stream_worker(rx, &*stream, &mut data_callback, &mut error_callback);
914             })
915             .unwrap();
916         Stream {
917             thread: Some(thread),
918             inner,
919             trigger: tx,
920         }
921     }
922 
new_output<D, E>( inner: Arc<StreamInner>, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,923     fn new_output<D, E>(
924         inner: Arc<StreamInner>,
925         mut data_callback: D,
926         mut error_callback: E,
927     ) -> Stream
928     where
929         D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
930         E: FnMut(StreamError) + Send + 'static,
931     {
932         let (tx, rx) = trigger();
933         // Clone the handle for passing into worker thread.
934         let stream = inner.clone();
935         let thread = thread::Builder::new()
936             .name("cpal_alsa_out".to_owned())
937             .spawn(move || {
938                 output_stream_worker(rx, &*stream, &mut data_callback, &mut error_callback);
939             })
940             .unwrap();
941         Stream {
942             thread: Some(thread),
943             inner,
944             trigger: tx,
945         }
946     }
947 }
948 
949 impl Drop for Stream {
drop(&mut self)950     fn drop(&mut self) {
951         self.trigger.wakeup();
952         self.thread.take().unwrap().join().unwrap();
953     }
954 }
955 
956 impl StreamTrait for Stream {
play(&self) -> Result<(), PlayStreamError>957     fn play(&self) -> Result<(), PlayStreamError> {
958         self.inner.channel.pause(false).ok();
959         Ok(())
960     }
pause(&self) -> Result<(), PauseStreamError>961     fn pause(&self) -> Result<(), PauseStreamError> {
962         self.inner.channel.pause(true).ok();
963         Ok(())
964     }
965 }
966 
set_hw_params_from_format( pcm_handle: &alsa::pcm::PCM, config: &StreamConfig, sample_format: SampleFormat, ) -> Result<bool, BackendSpecificError>967 fn set_hw_params_from_format(
968     pcm_handle: &alsa::pcm::PCM,
969     config: &StreamConfig,
970     sample_format: SampleFormat,
971 ) -> Result<bool, BackendSpecificError> {
972     let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
973     hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
974 
975     let sample_format = if cfg!(target_endian = "big") {
976         match sample_format {
977             SampleFormat::I16 => alsa::pcm::Format::S16BE,
978             SampleFormat::U16 => alsa::pcm::Format::U16BE,
979             SampleFormat::F32 => alsa::pcm::Format::FloatBE,
980         }
981     } else {
982         match sample_format {
983             SampleFormat::I16 => alsa::pcm::Format::S16LE,
984             SampleFormat::U16 => alsa::pcm::Format::U16LE,
985             SampleFormat::F32 => alsa::pcm::Format::FloatLE,
986         }
987     };
988 
989     hw_params.set_format(sample_format)?;
990     hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
991     hw_params.set_channels(config.channels as u32)?;
992 
993     match config.buffer_size {
994         BufferSize::Fixed(v) => {
995             hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
996             hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
997         }
998         BufferSize::Default => {
999             // These values together represent a moderate latency and wakeup interval.
1000             // Without them, we are at the mercy of the device
1001             hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1002             hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1003         }
1004     }
1005 
1006     pcm_handle.hw_params(&hw_params)?;
1007 
1008     Ok(hw_params.can_pause())
1009 }
1010 
set_sw_params_from_format( pcm_handle: &alsa::pcm::PCM, config: &StreamConfig, stream_type: alsa::Direction, ) -> Result<usize, BackendSpecificError>1011 fn set_sw_params_from_format(
1012     pcm_handle: &alsa::pcm::PCM,
1013     config: &StreamConfig,
1014     stream_type: alsa::Direction,
1015 ) -> Result<usize, BackendSpecificError> {
1016     let sw_params = pcm_handle.sw_params_current()?;
1017 
1018     let period_len = {
1019         let (buffer, period) = pcm_handle.get_params()?;
1020         if buffer == 0 {
1021             return Err(BackendSpecificError {
1022                 description: "initialization resulted in a null buffer".to_string(),
1023             });
1024         }
1025         sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1026 
1027         let start_threshold = match stream_type {
1028             alsa::Direction::Playback => buffer - period,
1029             alsa::Direction::Capture => 1,
1030         };
1031         sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1032 
1033         period as usize * config.channels as usize
1034     };
1035 
1036     sw_params.set_tstamp_mode(true)?;
1037     sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1038 
1039     pcm_handle.sw_params(&sw_params)?;
1040 
1041     Ok(period_len)
1042 }
1043 
1044 impl From<alsa::Error> for BackendSpecificError {
from(err: alsa::Error) -> Self1045     fn from(err: alsa::Error) -> Self {
1046         BackendSpecificError {
1047             description: err.to_string(),
1048         }
1049     }
1050 }
1051 
1052 impl From<alsa::Error> for BuildStreamError {
from(err: alsa::Error) -> Self1053     fn from(err: alsa::Error) -> Self {
1054         let err: BackendSpecificError = err.into();
1055         err.into()
1056     }
1057 }
1058 
1059 impl From<alsa::Error> for SupportedStreamConfigsError {
from(err: alsa::Error) -> Self1060     fn from(err: alsa::Error) -> Self {
1061         let err: BackendSpecificError = err.into();
1062         err.into()
1063     }
1064 }
1065 
1066 impl From<alsa::Error> for PlayStreamError {
from(err: alsa::Error) -> Self1067     fn from(err: alsa::Error) -> Self {
1068         let err: BackendSpecificError = err.into();
1069         err.into()
1070     }
1071 }
1072 
1073 impl From<alsa::Error> for PauseStreamError {
from(err: alsa::Error) -> Self1074     fn from(err: alsa::Error) -> Self {
1075         let err: BackendSpecificError = err.into();
1076         err.into()
1077     }
1078 }
1079 
1080 impl From<alsa::Error> for StreamError {
from(err: alsa::Error) -> Self1081     fn from(err: alsa::Error) -> Self {
1082         let err: BackendSpecificError = err.into();
1083         err.into()
1084     }
1085 }
1086