1 extern crate alsa;
2 extern crate libc;
3 extern crate parking_lot;
4
5 use self::alsa::poll::Descriptors;
6 use self::parking_lot::Mutex;
7 use crate::{
8 BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data,
9 DefaultStreamConfigError, DeviceNameError, DevicesError, InputCallbackInfo, OutputCallbackInfo,
10 PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, StreamError,
11 SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange,
12 SupportedStreamConfigsError,
13 };
14 use std::cmp;
15 use std::convert::TryInto;
16 use std::sync::Arc;
17 use std::thread::{self, JoinHandle};
18 use std::vec::IntoIter as VecIntoIter;
19 use traits::{DeviceTrait, HostTrait, StreamTrait};
20
21 pub use self::enumerate::{default_input_device, default_output_device, Devices};
22
23 pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
24 pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
25
26 mod enumerate;
27
28 /// The default linux, dragonfly and freebsd host type.
29 #[derive(Debug)]
30 pub struct Host;
31
32 impl Host {
new() -> Result<Self, crate::HostUnavailable>33 pub fn new() -> Result<Self, crate::HostUnavailable> {
34 Ok(Host)
35 }
36 }
37
38 impl HostTrait for Host {
39 type Devices = Devices;
40 type Device = Device;
41
is_available() -> bool42 fn is_available() -> bool {
43 // Assume ALSA is always available on linux/dragonfly/freebsd.
44 true
45 }
46
devices(&self) -> Result<Self::Devices, DevicesError>47 fn devices(&self) -> Result<Self::Devices, DevicesError> {
48 Devices::new()
49 }
50
default_input_device(&self) -> Option<Self::Device>51 fn default_input_device(&self) -> Option<Self::Device> {
52 default_input_device()
53 }
54
default_output_device(&self) -> Option<Self::Device>55 fn default_output_device(&self) -> Option<Self::Device> {
56 default_output_device()
57 }
58 }
59
60 impl DeviceTrait for Device {
61 type SupportedInputConfigs = SupportedInputConfigs;
62 type SupportedOutputConfigs = SupportedOutputConfigs;
63 type Stream = Stream;
64
name(&self) -> Result<String, DeviceNameError>65 fn name(&self) -> Result<String, DeviceNameError> {
66 Device::name(self)
67 }
68
supported_input_configs( &self, ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError>69 fn supported_input_configs(
70 &self,
71 ) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
72 Device::supported_input_configs(self)
73 }
74
supported_output_configs( &self, ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError>75 fn supported_output_configs(
76 &self,
77 ) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
78 Device::supported_output_configs(self)
79 }
80
default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>81 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
82 Device::default_input_config(self)
83 }
84
default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>85 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
86 Device::default_output_config(self)
87 }
88
build_input_stream_raw<D, E>( &self, conf: &StreamConfig, sample_format: SampleFormat, data_callback: D, error_callback: E, ) -> Result<Self::Stream, BuildStreamError> where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,89 fn build_input_stream_raw<D, E>(
90 &self,
91 conf: &StreamConfig,
92 sample_format: SampleFormat,
93 data_callback: D,
94 error_callback: E,
95 ) -> Result<Self::Stream, BuildStreamError>
96 where
97 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
98 E: FnMut(StreamError) + Send + 'static,
99 {
100 let stream_inner =
101 self.build_stream_inner(conf, sample_format, alsa::Direction::Capture)?;
102 let stream = Stream::new_input(Arc::new(stream_inner), data_callback, error_callback);
103 Ok(stream)
104 }
105
build_output_stream_raw<D, E>( &self, conf: &StreamConfig, sample_format: SampleFormat, data_callback: D, error_callback: E, ) -> Result<Self::Stream, BuildStreamError> where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,106 fn build_output_stream_raw<D, E>(
107 &self,
108 conf: &StreamConfig,
109 sample_format: SampleFormat,
110 data_callback: D,
111 error_callback: E,
112 ) -> Result<Self::Stream, BuildStreamError>
113 where
114 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
115 E: FnMut(StreamError) + Send + 'static,
116 {
117 let stream_inner =
118 self.build_stream_inner(conf, sample_format, alsa::Direction::Playback)?;
119 let stream = Stream::new_output(Arc::new(stream_inner), data_callback, error_callback);
120 Ok(stream)
121 }
122 }
123
124 struct TriggerSender(libc::c_int);
125
126 struct TriggerReceiver(libc::c_int);
127
128 impl TriggerSender {
wakeup(&self)129 fn wakeup(&self) {
130 let buf = 1u64;
131 let ret = unsafe { libc::write(self.0, &buf as *const u64 as *const _, 8) };
132 assert_eq!(ret, 8);
133 }
134 }
135
136 impl TriggerReceiver {
clear_pipe(&self)137 fn clear_pipe(&self) {
138 let mut out = 0u64;
139 let ret = unsafe { libc::read(self.0, &mut out as *mut u64 as *mut _, 8) };
140 assert_eq!(ret, 8);
141 }
142 }
143
trigger() -> (TriggerSender, TriggerReceiver)144 fn trigger() -> (TriggerSender, TriggerReceiver) {
145 let mut fds = [0, 0];
146 match unsafe { libc::pipe(fds.as_mut_ptr()) } {
147 0 => (TriggerSender(fds[1]), TriggerReceiver(fds[0])),
148 _ => panic!("Could not create pipe"),
149 }
150 }
151
152 impl Drop for TriggerSender {
drop(&mut self)153 fn drop(&mut self) {
154 unsafe {
155 libc::close(self.0);
156 }
157 }
158 }
159
160 impl Drop for TriggerReceiver {
drop(&mut self)161 fn drop(&mut self) {
162 unsafe {
163 libc::close(self.0);
164 }
165 }
166 }
167
168 #[derive(Default)]
169 struct DeviceHandles {
170 playback: Option<alsa::PCM>,
171 capture: Option<alsa::PCM>,
172 }
173
174 impl DeviceHandles {
175 /// Create `DeviceHandles` for `name` and try to open a handle for both
176 /// directions. Returns `Ok` if either direction is opened successfully.
open(name: &str) -> Result<Self, alsa::Error>177 fn open(name: &str) -> Result<Self, alsa::Error> {
178 let mut handles = Self::default();
179 let playback_err = handles.try_open(name, alsa::Direction::Playback).err();
180 let capture_err = handles.try_open(name, alsa::Direction::Capture).err();
181 if let Some(err) = capture_err.and(playback_err) {
182 Err(err)
183 } else {
184 Ok(handles)
185 }
186 }
187
188 /// Get a mutable reference to the `Option` for a specific `stream_type`.
189 /// If the `Option` is `None`, the `alsa::PCM` will be opened and placed in
190 /// the `Option` before returning. If `handle_mut()` returns `Ok` the contained
191 /// `Option` is guaranteed to be `Some(..)`.
try_open( &mut self, name: &str, stream_type: alsa::Direction, ) -> Result<&mut Option<alsa::PCM>, alsa::Error>192 fn try_open(
193 &mut self,
194 name: &str,
195 stream_type: alsa::Direction,
196 ) -> Result<&mut Option<alsa::PCM>, alsa::Error> {
197 let handle = match stream_type {
198 alsa::Direction::Playback => &mut self.playback,
199 alsa::Direction::Capture => &mut self.capture,
200 };
201
202 if handle.is_none() {
203 *handle = Some(alsa::pcm::PCM::new(name, stream_type, true)?);
204 }
205
206 Ok(handle)
207 }
208
209 /// Get a mutable reference to the `alsa::PCM` handle for a specific `stream_type`.
210 /// If the handle is not yet opened, it will be opened and stored in `self`.
get_mut( &mut self, name: &str, stream_type: alsa::Direction, ) -> Result<&mut alsa::PCM, alsa::Error>211 fn get_mut(
212 &mut self,
213 name: &str,
214 stream_type: alsa::Direction,
215 ) -> Result<&mut alsa::PCM, alsa::Error> {
216 Ok(self.try_open(name, stream_type)?.as_mut().unwrap())
217 }
218
219 /// Take ownership of the `alsa::PCM` handle for a specific `stream_type`.
220 /// If the handle is not yet opened, it will be opened and returned.
take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error>221 fn take(&mut self, name: &str, stream_type: alsa::Direction) -> Result<alsa::PCM, alsa::Error> {
222 Ok(self.try_open(name, stream_type)?.take().unwrap())
223 }
224 }
225
226 pub struct Device {
227 name: String,
228 handles: Mutex<DeviceHandles>,
229 }
230
231 impl Device {
build_stream_inner( &self, conf: &StreamConfig, sample_format: SampleFormat, stream_type: alsa::Direction, ) -> Result<StreamInner, BuildStreamError>232 fn build_stream_inner(
233 &self,
234 conf: &StreamConfig,
235 sample_format: SampleFormat,
236 stream_type: alsa::Direction,
237 ) -> Result<StreamInner, BuildStreamError> {
238 let handle_result = self
239 .handles
240 .lock()
241 .take(&self.name, stream_type)
242 .map_err(|e| (e, e.errno()));
243
244 let handle = match handle_result {
245 Err((_, Some(nix::errno::Errno::EBUSY))) => {
246 return Err(BuildStreamError::DeviceNotAvailable)
247 }
248 Err((_, Some(nix::errno::Errno::EINVAL))) => {
249 return Err(BuildStreamError::InvalidArgument)
250 }
251 Err((e, _)) => return Err(e.into()),
252 Ok(handle) => handle,
253 };
254 let can_pause = set_hw_params_from_format(&handle, conf, sample_format)?;
255 let period_len = set_sw_params_from_format(&handle, conf, stream_type)?;
256
257 handle.prepare()?;
258
259 let num_descriptors = {
260 let num_descriptors = handle.count();
261 if num_descriptors == 0 {
262 let description = "poll descriptor count for stream was 0".to_string();
263 let err = BackendSpecificError { description };
264 return Err(err.into());
265 }
266 num_descriptors
267 };
268
269 // Check to see if we can retrieve valid timestamps from the device.
270 // Related: https://bugs.freedesktop.org/show_bug.cgi?id=88503
271 let ts = handle.status()?.get_htstamp();
272 let creation_instant = match (ts.tv_sec, ts.tv_nsec) {
273 (0, 0) => Some(std::time::Instant::now()),
274 _ => None,
275 };
276
277 if let alsa::Direction::Capture = stream_type {
278 handle.start()?;
279 }
280
281 let stream_inner = StreamInner {
282 channel: handle,
283 sample_format,
284 num_descriptors,
285 conf: conf.clone(),
286 period_len,
287 can_pause,
288 creation_instant,
289 };
290
291 Ok(stream_inner)
292 }
293
294 #[inline]
name(&self) -> Result<String, DeviceNameError>295 fn name(&self) -> Result<String, DeviceNameError> {
296 Ok(self.name.clone())
297 }
298
supported_configs( &self, stream_t: alsa::Direction, ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError>299 fn supported_configs(
300 &self,
301 stream_t: alsa::Direction,
302 ) -> Result<VecIntoIter<SupportedStreamConfigRange>, SupportedStreamConfigsError> {
303 let mut guard = self.handles.lock();
304 let handle_result = guard
305 .get_mut(&self.name, stream_t)
306 .map_err(|e| (e, e.errno()));
307
308 let handle = match handle_result {
309 Err((_, Some(nix::errno::Errno::ENOENT)))
310 | Err((_, Some(nix::errno::Errno::EBUSY))) => {
311 return Err(SupportedStreamConfigsError::DeviceNotAvailable)
312 }
313 Err((_, Some(nix::errno::Errno::EINVAL))) => {
314 return Err(SupportedStreamConfigsError::InvalidArgument)
315 }
316 Err((e, _)) => return Err(e.into()),
317 Ok(handle) => handle,
318 };
319
320 let hw_params = alsa::pcm::HwParams::any(handle)?;
321
322 // TODO: check endianess
323 const FORMATS: [(SampleFormat, alsa::pcm::Format); 3] = [
324 //SND_PCM_FORMAT_S8,
325 //SND_PCM_FORMAT_U8,
326 (SampleFormat::I16, alsa::pcm::Format::S16LE),
327 //SND_PCM_FORMAT_S16_BE,
328 (SampleFormat::U16, alsa::pcm::Format::U16LE),
329 //SND_PCM_FORMAT_U16_BE,
330 //SND_PCM_FORMAT_S24_LE,
331 //SND_PCM_FORMAT_S24_BE,
332 //SND_PCM_FORMAT_U24_LE,
333 //SND_PCM_FORMAT_U24_BE,
334 //SND_PCM_FORMAT_S32_LE,
335 //SND_PCM_FORMAT_S32_BE,
336 //SND_PCM_FORMAT_U32_LE,
337 //SND_PCM_FORMAT_U32_BE,
338 (SampleFormat::F32, alsa::pcm::Format::FloatLE),
339 //SND_PCM_FORMAT_FLOAT_BE,
340 //SND_PCM_FORMAT_FLOAT64_LE,
341 //SND_PCM_FORMAT_FLOAT64_BE,
342 //SND_PCM_FORMAT_IEC958_SUBFRAME_LE,
343 //SND_PCM_FORMAT_IEC958_SUBFRAME_BE,
344 //SND_PCM_FORMAT_MU_LAW,
345 //SND_PCM_FORMAT_A_LAW,
346 //SND_PCM_FORMAT_IMA_ADPCM,
347 //SND_PCM_FORMAT_MPEG,
348 //SND_PCM_FORMAT_GSM,
349 //SND_PCM_FORMAT_SPECIAL,
350 //SND_PCM_FORMAT_S24_3LE,
351 //SND_PCM_FORMAT_S24_3BE,
352 //SND_PCM_FORMAT_U24_3LE,
353 //SND_PCM_FORMAT_U24_3BE,
354 //SND_PCM_FORMAT_S20_3LE,
355 //SND_PCM_FORMAT_S20_3BE,
356 //SND_PCM_FORMAT_U20_3LE,
357 //SND_PCM_FORMAT_U20_3BE,
358 //SND_PCM_FORMAT_S18_3LE,
359 //SND_PCM_FORMAT_S18_3BE,
360 //SND_PCM_FORMAT_U18_3LE,
361 //SND_PCM_FORMAT_U18_3BE,
362 ];
363
364 let mut supported_formats = Vec::new();
365 for &(sample_format, alsa_format) in FORMATS.iter() {
366 if hw_params.test_format(alsa_format).is_ok() {
367 supported_formats.push(sample_format);
368 }
369 }
370
371 let min_rate = hw_params.get_rate_min()?;
372 let max_rate = hw_params.get_rate_max()?;
373
374 let sample_rates = if min_rate == max_rate || hw_params.test_rate(min_rate + 1).is_ok() {
375 vec![(min_rate, max_rate)]
376 } else {
377 const RATES: [libc::c_uint; 13] = [
378 5512, 8000, 11025, 16000, 22050, 32000, 44100, 48000, 64000, 88200, 96000, 176400,
379 192000,
380 ];
381
382 let mut rates = Vec::new();
383 for &rate in RATES.iter() {
384 if hw_params.test_rate(rate).is_ok() {
385 rates.push((rate, rate));
386 }
387 }
388
389 if rates.is_empty() {
390 vec![(min_rate, max_rate)]
391 } else {
392 rates
393 }
394 };
395
396 let min_channels = hw_params.get_channels_min()?;
397 let max_channels = hw_params.get_channels_max()?;
398
399 let max_channels = cmp::min(max_channels, 32); // TODO: limiting to 32 channels or too much stuff is returned
400 let supported_channels = (min_channels..max_channels + 1)
401 .filter_map(|num| {
402 if hw_params.test_channels(num).is_ok() {
403 Some(num as ChannelCount)
404 } else {
405 None
406 }
407 })
408 .collect::<Vec<_>>();
409
410 let min_buffer_size = hw_params.get_buffer_size_min()?;
411 let max_buffer_size = hw_params.get_buffer_size_max()?;
412
413 let buffer_size_range = SupportedBufferSize::Range {
414 min: min_buffer_size as u32,
415 max: max_buffer_size as u32,
416 };
417
418 let mut output = Vec::with_capacity(
419 supported_formats.len() * supported_channels.len() * sample_rates.len(),
420 );
421 for &sample_format in supported_formats.iter() {
422 for &channels in supported_channels.iter() {
423 for &(min_rate, max_rate) in sample_rates.iter() {
424 output.push(SupportedStreamConfigRange {
425 channels,
426 min_sample_rate: SampleRate(min_rate as u32),
427 max_sample_rate: SampleRate(max_rate as u32),
428 buffer_size: buffer_size_range.clone(),
429 sample_format,
430 });
431 }
432 }
433 }
434
435 Ok(output.into_iter())
436 }
437
supported_input_configs( &self, ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError>438 fn supported_input_configs(
439 &self,
440 ) -> Result<SupportedInputConfigs, SupportedStreamConfigsError> {
441 self.supported_configs(alsa::Direction::Capture)
442 }
443
supported_output_configs( &self, ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError>444 fn supported_output_configs(
445 &self,
446 ) -> Result<SupportedOutputConfigs, SupportedStreamConfigsError> {
447 self.supported_configs(alsa::Direction::Playback)
448 }
449
450 // ALSA does not offer default stream formats, so instead we compare all supported formats by
451 // the `SupportedStreamConfigRange::cmp_default_heuristics` order and select the greatest.
default_config( &self, stream_t: alsa::Direction, ) -> Result<SupportedStreamConfig, DefaultStreamConfigError>452 fn default_config(
453 &self,
454 stream_t: alsa::Direction,
455 ) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
456 let mut formats: Vec<_> = {
457 match self.supported_configs(stream_t) {
458 Err(SupportedStreamConfigsError::DeviceNotAvailable) => {
459 return Err(DefaultStreamConfigError::DeviceNotAvailable);
460 }
461 Err(SupportedStreamConfigsError::InvalidArgument) => {
462 // this happens sometimes when querying for input and output capabilities, but
463 // the device supports only one
464 return Err(DefaultStreamConfigError::StreamTypeNotSupported);
465 }
466 Err(SupportedStreamConfigsError::BackendSpecific { err }) => {
467 return Err(err.into());
468 }
469 Ok(fmts) => fmts.collect(),
470 }
471 };
472
473 formats.sort_by(|a, b| a.cmp_default_heuristics(b));
474
475 match formats.into_iter().last() {
476 Some(f) => {
477 let min_r = f.min_sample_rate;
478 let max_r = f.max_sample_rate;
479 let mut format = f.with_max_sample_rate();
480 const HZ_44100: SampleRate = SampleRate(44_100);
481 if min_r <= HZ_44100 && HZ_44100 <= max_r {
482 format.sample_rate = HZ_44100;
483 }
484 Ok(format)
485 }
486 None => Err(DefaultStreamConfigError::StreamTypeNotSupported),
487 }
488 }
489
default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>490 fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
491 self.default_config(alsa::Direction::Capture)
492 }
493
default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError>494 fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
495 self.default_config(alsa::Direction::Playback)
496 }
497 }
498
499 struct StreamInner {
500 // The ALSA channel.
501 channel: alsa::pcm::PCM,
502
503 // When converting between file descriptors and `snd_pcm_t`, this is the number of
504 // file descriptors that this `snd_pcm_t` uses.
505 num_descriptors: usize,
506
507 // Format of the samples.
508 sample_format: SampleFormat,
509
510 // The configuration used to open this stream.
511 conf: StreamConfig,
512
513 // Minimum number of samples to put in the buffer.
514 period_len: usize,
515
516 #[allow(dead_code)]
517 // Whether or not the hardware supports pausing the stream.
518 // TODO: We need an API to expose this. See #197, #284.
519 can_pause: bool,
520
521 // In the case that the device does not return valid timestamps via `get_htstamp`, this field
522 // will be `Some` and will contain an `Instant` representing the moment the stream was created.
523 //
524 // If this field is `Some`, then the stream will use the duration since this instant as a
525 // source for timestamps.
526 //
527 // If this field is `None` then the elapsed duration between `get_trigger_htstamp` and
528 // `get_htstamp` is used.
529 creation_instant: Option<std::time::Instant>,
530 }
531
532 // Assume that the ALSA library is built with thread safe option.
533 unsafe impl Sync for StreamInner {}
534
535 #[derive(Debug, Eq, PartialEq)]
536 enum StreamType {
537 Input,
538 Output,
539 }
540
541 pub struct Stream {
542 /// The high-priority audio processing thread calling callbacks.
543 /// Option used for moving out in destructor.
544 thread: Option<JoinHandle<()>>,
545
546 /// Handle to the underlying stream for playback controls.
547 inner: Arc<StreamInner>,
548
549 /// Used to signal to stop processing.
550 trigger: TriggerSender,
551 }
552
553 #[derive(Default)]
554 struct StreamWorkerContext {
555 descriptors: Vec<libc::pollfd>,
556 buffer: Vec<u8>,
557 }
558
input_stream_worker( rx: TriggerReceiver, stream: &StreamInner, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), )559 fn input_stream_worker(
560 rx: TriggerReceiver,
561 stream: &StreamInner,
562 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
563 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
564 ) {
565 let mut ctxt = StreamWorkerContext::default();
566 loop {
567 let flow = report_error(
568 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt),
569 error_callback,
570 )
571 .unwrap_or(PollDescriptorsFlow::Continue);
572
573 match flow {
574 PollDescriptorsFlow::Continue => {
575 continue;
576 }
577 PollDescriptorsFlow::XRun => {
578 report_error(stream.channel.prepare(), error_callback);
579 continue;
580 }
581 PollDescriptorsFlow::Return => return,
582 PollDescriptorsFlow::Ready {
583 status,
584 avail_frames: _,
585 delay_frames,
586 stream_type,
587 } => {
588 assert_eq!(
589 stream_type,
590 StreamType::Input,
591 "expected input stream, but polling descriptors indicated output",
592 );
593 let res = process_input(
594 stream,
595 &mut ctxt.buffer,
596 status,
597 delay_frames,
598 data_callback,
599 );
600 report_error(res, error_callback);
601 }
602 }
603 }
604 }
605
output_stream_worker( rx: TriggerReceiver, stream: &StreamInner, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), )606 fn output_stream_worker(
607 rx: TriggerReceiver,
608 stream: &StreamInner,
609 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
610 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
611 ) {
612 let mut ctxt = StreamWorkerContext::default();
613 loop {
614 let flow = report_error(
615 poll_descriptors_and_prepare_buffer(&rx, stream, &mut ctxt),
616 error_callback,
617 )
618 .unwrap_or(PollDescriptorsFlow::Continue);
619
620 match flow {
621 PollDescriptorsFlow::Continue => continue,
622 PollDescriptorsFlow::XRun => {
623 report_error(stream.channel.prepare(), error_callback);
624 continue;
625 }
626 PollDescriptorsFlow::Return => return,
627 PollDescriptorsFlow::Ready {
628 status,
629 avail_frames,
630 delay_frames,
631 stream_type,
632 } => {
633 assert_eq!(
634 stream_type,
635 StreamType::Output,
636 "expected output stream, but polling descriptors indicated input",
637 );
638 let res = process_output(
639 stream,
640 &mut ctxt.buffer,
641 status,
642 avail_frames,
643 delay_frames,
644 data_callback,
645 error_callback,
646 );
647 report_error(res, error_callback);
648 }
649 }
650 }
651 }
652
report_error<T, E>( result: Result<T, E>, error_callback: &mut (dyn FnMut(StreamError) + Send + 'static), ) -> Option<T> where E: Into<StreamError>,653 fn report_error<T, E>(
654 result: Result<T, E>,
655 error_callback: &mut (dyn FnMut(StreamError) + Send + 'static),
656 ) -> Option<T>
657 where
658 E: Into<StreamError>,
659 {
660 match result {
661 Ok(val) => Some(val),
662 Err(err) => {
663 error_callback(err.into());
664 None
665 }
666 }
667 }
668
669 enum PollDescriptorsFlow {
670 Continue,
671 Return,
672 Ready {
673 stream_type: StreamType,
674 status: alsa::pcm::Status,
675 avail_frames: usize,
676 delay_frames: usize,
677 },
678 XRun,
679 }
680
681 // This block is shared between both input and output stream worker functions.
poll_descriptors_and_prepare_buffer( rx: &TriggerReceiver, stream: &StreamInner, ctxt: &mut StreamWorkerContext, ) -> Result<PollDescriptorsFlow, BackendSpecificError>682 fn poll_descriptors_and_prepare_buffer(
683 rx: &TriggerReceiver,
684 stream: &StreamInner,
685 ctxt: &mut StreamWorkerContext,
686 ) -> Result<PollDescriptorsFlow, BackendSpecificError> {
687 let StreamWorkerContext {
688 ref mut descriptors,
689 ref mut buffer,
690 } = *ctxt;
691
692 descriptors.clear();
693
694 // Add the self-pipe for signaling termination.
695 descriptors.push(libc::pollfd {
696 fd: rx.0,
697 events: libc::POLLIN,
698 revents: 0,
699 });
700
701 // Add ALSA polling fds.
702 let len = descriptors.len();
703 descriptors.resize(
704 stream.num_descriptors + len,
705 libc::pollfd {
706 fd: 0,
707 events: 0,
708 revents: 0,
709 },
710 );
711 let filled = stream.channel.fill(&mut descriptors[len..])?;
712 debug_assert_eq!(filled, stream.num_descriptors);
713
714 // Don't timeout, wait forever.
715 let res = alsa::poll::poll(descriptors, -1)?;
716 if res == 0 {
717 let description = String::from("`alsa::poll()` spuriously returned");
718 return Err(BackendSpecificError { description });
719 }
720
721 if descriptors[0].revents != 0 {
722 // The stream has been requested to be destroyed.
723 rx.clear_pipe();
724 return Ok(PollDescriptorsFlow::Return);
725 }
726
727 let stream_type = match stream.channel.revents(&descriptors[1..])? {
728 alsa::poll::Flags::OUT => StreamType::Output,
729 alsa::poll::Flags::IN => StreamType::Input,
730 _ => {
731 // Nothing to process, poll again
732 return Ok(PollDescriptorsFlow::Continue);
733 }
734 };
735
736 let status = stream.channel.status()?;
737 let avail_frames = match stream.channel.avail() {
738 Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
739 return Ok(PollDescriptorsFlow::XRun)
740 }
741 res => res,
742 }? as usize;
743 let delay_frames = match status.get_delay() {
744 // Buffer underrun. TODO: Notify the user.
745 d if d < 0 => 0,
746 d => d as usize,
747 };
748 let available_samples = avail_frames * stream.conf.channels as usize;
749
750 // Only go on if there is at least `stream.period_len` samples.
751 if available_samples < stream.period_len {
752 return Ok(PollDescriptorsFlow::Continue);
753 }
754
755 // Prepare the data buffer.
756 let buffer_size = stream.sample_format.sample_size() * available_samples;
757 buffer.resize(buffer_size, 0u8);
758
759 Ok(PollDescriptorsFlow::Ready {
760 stream_type,
761 status,
762 avail_frames,
763 delay_frames,
764 })
765 }
766
767 // Read input data from ALSA and deliver it to the user.
process_input( stream: &StreamInner, buffer: &mut [u8], status: alsa::pcm::Status, delay_frames: usize, data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static), ) -> Result<(), BackendSpecificError>768 fn process_input(
769 stream: &StreamInner,
770 buffer: &mut [u8],
771 status: alsa::pcm::Status,
772 delay_frames: usize,
773 data_callback: &mut (dyn FnMut(&Data, &InputCallbackInfo) + Send + 'static),
774 ) -> Result<(), BackendSpecificError> {
775 stream.channel.io_bytes().readi(buffer)?;
776 let sample_format = stream.sample_format;
777 let data = buffer.as_mut_ptr() as *mut ();
778 let len = buffer.len() / sample_format.sample_size();
779 let data = unsafe { Data::from_parts(data, len, sample_format) };
780 let callback = stream_timestamp(&status, stream.creation_instant)?;
781 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
782 let capture = callback
783 .sub(delay_duration)
784 .expect("`capture` is earlier than representation supported by `StreamInstant`");
785 let timestamp = crate::InputStreamTimestamp { callback, capture };
786 let info = crate::InputCallbackInfo { timestamp };
787 data_callback(&data, &info);
788
789 Ok(())
790 }
791
792 // Request data from the user's function and write it via ALSA.
793 //
794 // Returns `true`
process_output( stream: &StreamInner, buffer: &mut [u8], status: alsa::pcm::Status, available_frames: usize, delay_frames: usize, data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static), error_callback: &mut dyn FnMut(StreamError), ) -> Result<(), BackendSpecificError>795 fn process_output(
796 stream: &StreamInner,
797 buffer: &mut [u8],
798 status: alsa::pcm::Status,
799 available_frames: usize,
800 delay_frames: usize,
801 data_callback: &mut (dyn FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static),
802 error_callback: &mut dyn FnMut(StreamError),
803 ) -> Result<(), BackendSpecificError> {
804 {
805 // We're now sure that we're ready to write data.
806 let sample_format = stream.sample_format;
807 let data = buffer.as_mut_ptr() as *mut ();
808 let len = buffer.len() / sample_format.sample_size();
809 let mut data = unsafe { Data::from_parts(data, len, sample_format) };
810 let callback = stream_timestamp(&status, stream.creation_instant)?;
811 let delay_duration = frames_to_duration(delay_frames, stream.conf.sample_rate);
812 let playback = callback
813 .add(delay_duration)
814 .expect("`playback` occurs beyond representation supported by `StreamInstant`");
815 let timestamp = crate::OutputStreamTimestamp { callback, playback };
816 let info = crate::OutputCallbackInfo { timestamp };
817 data_callback(&mut data, &info);
818 }
819 loop {
820 match stream.channel.io_bytes().writei(buffer) {
821 Err(err) if err.errno() == Some(nix::errno::Errno::EPIPE) => {
822 // buffer underrun
823 // TODO: Notify the user of this.
824 let _ = stream.channel.try_recover(err, false);
825 }
826 Err(err) => {
827 error_callback(err.into());
828 continue;
829 }
830 Ok(result) if result != available_frames => {
831 let description = format!(
832 "unexpected number of frames written: expected {}, \
833 result {} (this should never happen)",
834 available_frames, result,
835 );
836 error_callback(BackendSpecificError { description }.into());
837 continue;
838 }
839 _ => {
840 break;
841 }
842 }
843 }
844 Ok(())
845 }
846
847 // Use the elapsed duration since the start of the stream.
848 //
849 // This ensures positive values that are compatible with our `StreamInstant` representation.
stream_timestamp( status: &alsa::pcm::Status, creation_instant: Option<std::time::Instant>, ) -> Result<crate::StreamInstant, BackendSpecificError>850 fn stream_timestamp(
851 status: &alsa::pcm::Status,
852 creation_instant: Option<std::time::Instant>,
853 ) -> Result<crate::StreamInstant, BackendSpecificError> {
854 match creation_instant {
855 None => {
856 let trigger_ts = status.get_trigger_htstamp();
857 let ts = status.get_htstamp();
858 let nanos = timespec_diff_nanos(ts, trigger_ts);
859 if nanos < 0 {
860 panic!(
861 "get_htstamp `{:?}` was earlier than get_trigger_htstamp `{:?}`",
862 ts, trigger_ts
863 );
864 }
865 Ok(crate::StreamInstant::from_nanos(nanos))
866 }
867 Some(creation) => {
868 let now = std::time::Instant::now();
869 let duration = now.duration_since(creation);
870 let instant = crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128)
871 .expect("stream duration has exceeded `StreamInstant` representation");
872 Ok(instant)
873 }
874 }
875 }
876
877 // Adapted from `timestamp2ns` here:
878 // https://fossies.org/linux/alsa-lib/test/audio_time.c
timespec_to_nanos(ts: libc::timespec) -> i64879 fn timespec_to_nanos(ts: libc::timespec) -> i64 {
880 ts.tv_sec as i64 * 1_000_000_000 + ts.tv_nsec as i64
881 }
882
883 // Adapted from `timediff` here:
884 // https://fossies.org/linux/alsa-lib/test/audio_time.c
timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64885 fn timespec_diff_nanos(a: libc::timespec, b: libc::timespec) -> i64 {
886 timespec_to_nanos(a) - timespec_to_nanos(b)
887 }
888
889 // Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration890 fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration {
891 let secsf = frames as f64 / rate.0 as f64;
892 let secs = secsf as u64;
893 let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
894 std::time::Duration::new(secs, nanos)
895 }
896
897 impl Stream {
new_input<D, E>( inner: Arc<StreamInner>, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,898 fn new_input<D, E>(
899 inner: Arc<StreamInner>,
900 mut data_callback: D,
901 mut error_callback: E,
902 ) -> Stream
903 where
904 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
905 E: FnMut(StreamError) + Send + 'static,
906 {
907 let (tx, rx) = trigger();
908 // Clone the handle for passing into worker thread.
909 let stream = inner.clone();
910 let thread = thread::Builder::new()
911 .name("cpal_alsa_in".to_owned())
912 .spawn(move || {
913 input_stream_worker(rx, &*stream, &mut data_callback, &mut error_callback);
914 })
915 .unwrap();
916 Stream {
917 thread: Some(thread),
918 inner,
919 trigger: tx,
920 }
921 }
922
new_output<D, E>( inner: Arc<StreamInner>, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,923 fn new_output<D, E>(
924 inner: Arc<StreamInner>,
925 mut data_callback: D,
926 mut error_callback: E,
927 ) -> Stream
928 where
929 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
930 E: FnMut(StreamError) + Send + 'static,
931 {
932 let (tx, rx) = trigger();
933 // Clone the handle for passing into worker thread.
934 let stream = inner.clone();
935 let thread = thread::Builder::new()
936 .name("cpal_alsa_out".to_owned())
937 .spawn(move || {
938 output_stream_worker(rx, &*stream, &mut data_callback, &mut error_callback);
939 })
940 .unwrap();
941 Stream {
942 thread: Some(thread),
943 inner,
944 trigger: tx,
945 }
946 }
947 }
948
949 impl Drop for Stream {
drop(&mut self)950 fn drop(&mut self) {
951 self.trigger.wakeup();
952 self.thread.take().unwrap().join().unwrap();
953 }
954 }
955
956 impl StreamTrait for Stream {
play(&self) -> Result<(), PlayStreamError>957 fn play(&self) -> Result<(), PlayStreamError> {
958 self.inner.channel.pause(false).ok();
959 Ok(())
960 }
pause(&self) -> Result<(), PauseStreamError>961 fn pause(&self) -> Result<(), PauseStreamError> {
962 self.inner.channel.pause(true).ok();
963 Ok(())
964 }
965 }
966
set_hw_params_from_format( pcm_handle: &alsa::pcm::PCM, config: &StreamConfig, sample_format: SampleFormat, ) -> Result<bool, BackendSpecificError>967 fn set_hw_params_from_format(
968 pcm_handle: &alsa::pcm::PCM,
969 config: &StreamConfig,
970 sample_format: SampleFormat,
971 ) -> Result<bool, BackendSpecificError> {
972 let hw_params = alsa::pcm::HwParams::any(pcm_handle)?;
973 hw_params.set_access(alsa::pcm::Access::RWInterleaved)?;
974
975 let sample_format = if cfg!(target_endian = "big") {
976 match sample_format {
977 SampleFormat::I16 => alsa::pcm::Format::S16BE,
978 SampleFormat::U16 => alsa::pcm::Format::U16BE,
979 SampleFormat::F32 => alsa::pcm::Format::FloatBE,
980 }
981 } else {
982 match sample_format {
983 SampleFormat::I16 => alsa::pcm::Format::S16LE,
984 SampleFormat::U16 => alsa::pcm::Format::U16LE,
985 SampleFormat::F32 => alsa::pcm::Format::FloatLE,
986 }
987 };
988
989 hw_params.set_format(sample_format)?;
990 hw_params.set_rate(config.sample_rate.0, alsa::ValueOr::Nearest)?;
991 hw_params.set_channels(config.channels as u32)?;
992
993 match config.buffer_size {
994 BufferSize::Fixed(v) => {
995 hw_params.set_period_size_near((v / 4) as alsa::pcm::Frames, alsa::ValueOr::Nearest)?;
996 hw_params.set_buffer_size(v as alsa::pcm::Frames)?;
997 }
998 BufferSize::Default => {
999 // These values together represent a moderate latency and wakeup interval.
1000 // Without them, we are at the mercy of the device
1001 hw_params.set_period_time_near(25_000, alsa::ValueOr::Nearest)?;
1002 hw_params.set_buffer_time_near(100_000, alsa::ValueOr::Nearest)?;
1003 }
1004 }
1005
1006 pcm_handle.hw_params(&hw_params)?;
1007
1008 Ok(hw_params.can_pause())
1009 }
1010
set_sw_params_from_format( pcm_handle: &alsa::pcm::PCM, config: &StreamConfig, stream_type: alsa::Direction, ) -> Result<usize, BackendSpecificError>1011 fn set_sw_params_from_format(
1012 pcm_handle: &alsa::pcm::PCM,
1013 config: &StreamConfig,
1014 stream_type: alsa::Direction,
1015 ) -> Result<usize, BackendSpecificError> {
1016 let sw_params = pcm_handle.sw_params_current()?;
1017
1018 let period_len = {
1019 let (buffer, period) = pcm_handle.get_params()?;
1020 if buffer == 0 {
1021 return Err(BackendSpecificError {
1022 description: "initialization resulted in a null buffer".to_string(),
1023 });
1024 }
1025 sw_params.set_avail_min(period as alsa::pcm::Frames)?;
1026
1027 let start_threshold = match stream_type {
1028 alsa::Direction::Playback => buffer - period,
1029 alsa::Direction::Capture => 1,
1030 };
1031 sw_params.set_start_threshold(start_threshold.try_into().unwrap())?;
1032
1033 period as usize * config.channels as usize
1034 };
1035
1036 sw_params.set_tstamp_mode(true)?;
1037 sw_params.set_tstamp_type(alsa::pcm::TstampType::MonotonicRaw)?;
1038
1039 pcm_handle.sw_params(&sw_params)?;
1040
1041 Ok(period_len)
1042 }
1043
1044 impl From<alsa::Error> for BackendSpecificError {
from(err: alsa::Error) -> Self1045 fn from(err: alsa::Error) -> Self {
1046 BackendSpecificError {
1047 description: err.to_string(),
1048 }
1049 }
1050 }
1051
1052 impl From<alsa::Error> for BuildStreamError {
from(err: alsa::Error) -> Self1053 fn from(err: alsa::Error) -> Self {
1054 let err: BackendSpecificError = err.into();
1055 err.into()
1056 }
1057 }
1058
1059 impl From<alsa::Error> for SupportedStreamConfigsError {
from(err: alsa::Error) -> Self1060 fn from(err: alsa::Error) -> Self {
1061 let err: BackendSpecificError = err.into();
1062 err.into()
1063 }
1064 }
1065
1066 impl From<alsa::Error> for PlayStreamError {
from(err: alsa::Error) -> Self1067 fn from(err: alsa::Error) -> Self {
1068 let err: BackendSpecificError = err.into();
1069 err.into()
1070 }
1071 }
1072
1073 impl From<alsa::Error> for PauseStreamError {
from(err: alsa::Error) -> Self1074 fn from(err: alsa::Error) -> Self {
1075 let err: BackendSpecificError = err.into();
1076 err.into()
1077 }
1078 }
1079
1080 impl From<alsa::Error> for StreamError {
from(err: alsa::Error) -> Self1081 fn from(err: alsa::Error) -> Self {
1082 let err: BackendSpecificError = err.into();
1083 err.into()
1084 }
1085 }
1086