1 use super::check_result;
2 use super::winapi::shared::basetsd::{UINT32, UINT64};
3 use super::winapi::shared::minwindef::{BYTE, FALSE, WORD};
4 use super::winapi::um::audioclient::{self, AUDCLNT_E_DEVICE_INVALIDATED, AUDCLNT_S_BUFFER_EMPTY};
5 use super::winapi::um::handleapi;
6 use super::winapi::um::synchapi;
7 use super::winapi::um::winbase;
8 use super::winapi::um::winnt;
9 use crate::traits::StreamTrait;
10 use crate::{
11     BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError,
12     PlayStreamError, SampleFormat, StreamError,
13 };
14 use std::mem;
15 use std::ptr;
16 use std::sync::mpsc::{channel, Receiver, Sender};
17 use std::thread::{self, JoinHandle};
18 
19 pub struct Stream {
20     /// The high-priority audio processing thread calling callbacks.
21     /// Option used for moving out in destructor.
22     ///
23     /// TODO: Actually set the thread priority.
24     thread: Option<JoinHandle<()>>,
25 
26     // Commands processed by the `run()` method that is currently running.
27     // `pending_scheduled_event` must be signalled whenever a command is added here, so that it
28     // will get picked up.
29     commands: Sender<Command>,
30 
31     // This event is signalled after a new entry is added to `commands`, so that the `run()`
32     // method can be notified.
33     pending_scheduled_event: winnt::HANDLE,
34 }
35 
36 struct RunContext {
37     // Streams that have been created in this event loop.
38     stream: StreamInner,
39 
40     // Handles corresponding to the `event` field of each element of `voices`. Must always be in
41     // sync with `voices`, except that the first element is always `pending_scheduled_event`.
42     handles: Vec<winnt::HANDLE>,
43 
44     commands: Receiver<Command>,
45 }
46 
47 // Once we start running the eventloop, the RunContext will not be moved.
48 unsafe impl Send for RunContext {}
49 
50 pub enum Command {
51     PlayStream,
52     PauseStream,
53     Terminate,
54 }
55 
56 pub enum AudioClientFlow {
57     Render {
58         render_client: *mut audioclient::IAudioRenderClient,
59     },
60     Capture {
61         capture_client: *mut audioclient::IAudioCaptureClient,
62     },
63 }
64 
65 pub struct StreamInner {
66     pub audio_client: *mut audioclient::IAudioClient,
67     pub audio_clock: *mut audioclient::IAudioClock,
68     pub client_flow: AudioClientFlow,
69     // Event that is signalled by WASAPI whenever audio data must be written.
70     pub event: winnt::HANDLE,
71     // True if the stream is currently playing. False if paused.
72     pub playing: bool,
73     // Number of frames of audio data in the underlying buffer allocated by WASAPI.
74     pub max_frames_in_buffer: UINT32,
75     // Number of bytes that each frame occupies.
76     pub bytes_per_frame: WORD,
77     // The configuration with which the stream was created.
78     pub config: crate::StreamConfig,
79     // The sample format with which the stream was created.
80     pub sample_format: SampleFormat,
81 }
82 
83 impl Stream {
new_input<D, E>( stream_inner: StreamInner, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,84     pub(crate) fn new_input<D, E>(
85         stream_inner: StreamInner,
86         mut data_callback: D,
87         mut error_callback: E,
88     ) -> Stream
89     where
90         D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
91         E: FnMut(StreamError) + Send + 'static,
92     {
93         let pending_scheduled_event =
94             unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
95         let (tx, rx) = channel();
96 
97         let run_context = RunContext {
98             handles: vec![pending_scheduled_event, stream_inner.event],
99             stream: stream_inner,
100             commands: rx,
101         };
102 
103         let thread = thread::Builder::new()
104             .name("cpal_wasapi_in".to_owned())
105             .spawn(move || run_input(run_context, &mut data_callback, &mut error_callback))
106             .unwrap();
107 
108         Stream {
109             thread: Some(thread),
110             commands: tx,
111             pending_scheduled_event,
112         }
113     }
114 
new_output<D, E>( stream_inner: StreamInner, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,115     pub(crate) fn new_output<D, E>(
116         stream_inner: StreamInner,
117         mut data_callback: D,
118         mut error_callback: E,
119     ) -> Stream
120     where
121         D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
122         E: FnMut(StreamError) + Send + 'static,
123     {
124         let pending_scheduled_event =
125             unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
126         let (tx, rx) = channel();
127 
128         let run_context = RunContext {
129             handles: vec![pending_scheduled_event, stream_inner.event],
130             stream: stream_inner,
131             commands: rx,
132         };
133 
134         let thread = thread::Builder::new()
135             .name("cpal_wasapi_out".to_owned())
136             .spawn(move || run_output(run_context, &mut data_callback, &mut error_callback))
137             .unwrap();
138 
139         Stream {
140             thread: Some(thread),
141             commands: tx,
142             pending_scheduled_event,
143         }
144     }
145 
146     #[inline]
push_command(&self, command: Command)147     fn push_command(&self, command: Command) {
148         // Sender generally outlives receiver, unless the device gets unplugged.
149         let _ = self.commands.send(command);
150         unsafe {
151             let result = synchapi::SetEvent(self.pending_scheduled_event);
152             assert_ne!(result, 0);
153         }
154     }
155 }
156 
157 impl Drop for Stream {
158     #[inline]
drop(&mut self)159     fn drop(&mut self) {
160         self.push_command(Command::Terminate);
161         self.thread.take().unwrap().join().unwrap();
162         unsafe {
163             handleapi::CloseHandle(self.pending_scheduled_event);
164         }
165     }
166 }
167 
168 impl StreamTrait for Stream {
play(&self) -> Result<(), PlayStreamError>169     fn play(&self) -> Result<(), PlayStreamError> {
170         self.push_command(Command::PlayStream);
171         Ok(())
172     }
pause(&self) -> Result<(), PauseStreamError>173     fn pause(&self) -> Result<(), PauseStreamError> {
174         self.push_command(Command::PauseStream);
175         Ok(())
176     }
177 }
178 
179 impl Drop for AudioClientFlow {
drop(&mut self)180     fn drop(&mut self) {
181         unsafe {
182             match *self {
183                 AudioClientFlow::Capture { capture_client } => (*capture_client).Release(),
184                 AudioClientFlow::Render { render_client } => (*render_client).Release(),
185             };
186         }
187     }
188 }
189 
190 impl Drop for StreamInner {
191     #[inline]
drop(&mut self)192     fn drop(&mut self) {
193         unsafe {
194             (*self.audio_client).Release();
195             (*self.audio_clock).Release();
196             handleapi::CloseHandle(self.event);
197         }
198     }
199 }
200 
201 // Process any pending commands that are queued within the `RunContext`.
202 // Returns `true` if the loop should continue running, `false` if it should terminate.
process_commands(run_context: &mut RunContext) -> Result<bool, StreamError>203 fn process_commands(run_context: &mut RunContext) -> Result<bool, StreamError> {
204     // Process the pending commands.
205     for command in run_context.commands.try_iter() {
206         match command {
207             Command::PlayStream => {
208                 if !run_context.stream.playing {
209                     let hresult = unsafe { (*run_context.stream.audio_client).Start() };
210 
211                     if let Err(err) = stream_error_from_hresult(hresult) {
212                         return Err(err);
213                     }
214                     run_context.stream.playing = true;
215                 }
216             }
217             Command::PauseStream => {
218                 if run_context.stream.playing {
219                     let hresult = unsafe { (*run_context.stream.audio_client).Stop() };
220                     if let Err(err) = stream_error_from_hresult(hresult) {
221                         return Err(err);
222                     }
223                     run_context.stream.playing = false;
224                 }
225             }
226             Command::Terminate => {
227                 return Ok(false);
228             }
229         }
230     }
231 
232     Ok(true)
233 }
234 // Wait for any of the given handles to be signalled.
235 //
236 // Returns the index of the `handle` that was signalled, or an `Err` if
237 // `WaitForMultipleObjectsEx` fails.
238 //
239 // This is called when the `run` thread is ready to wait for the next event. The
240 // next event might be some command submitted by the user (the first handle) or
241 // might indicate that one of the streams is ready to deliver or receive audio.
wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError>242 fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError> {
243     debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
244     let result = unsafe {
245         synchapi::WaitForMultipleObjectsEx(
246             handles.len() as u32,
247             handles.as_ptr(),
248             FALSE,             // Don't wait for all, just wait for the first
249             winbase::INFINITE, // TODO: allow setting a timeout
250             FALSE,             // irrelevant parameter here
251         )
252     };
253     if result == winbase::WAIT_FAILED {
254         let err = unsafe { winapi::um::errhandlingapi::GetLastError() };
255         let description = format!("`WaitForMultipleObjectsEx failed: {}", err);
256         let err = BackendSpecificError { description };
257         return Err(err);
258     }
259     // Notifying the corresponding task handler.
260     let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize;
261     Ok(handle_idx)
262 }
263 
264 // Get the number of available frames that are available for writing/reading.
get_available_frames(stream: &StreamInner) -> Result<u32, StreamError>265 fn get_available_frames(stream: &StreamInner) -> Result<u32, StreamError> {
266     unsafe {
267         let mut padding = 0u32;
268         let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
269         stream_error_from_hresult(hresult)?;
270         Ok(stream.max_frames_in_buffer - padding)
271     }
272 }
273 
274 // Convert the given `HRESULT` into a `StreamError` if it does indicate an error.
stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError>275 fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> {
276     if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
277         return Err(StreamError::DeviceNotAvailable);
278     }
279     if let Err(err) = check_result(hresult) {
280         let description = format!("{}", err);
281         let err = BackendSpecificError { description };
282         return Err(err.into());
283     }
284     Ok(())
285 }
286 
run_input( mut run_ctxt: RunContext, data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), )287 fn run_input(
288     mut run_ctxt: RunContext,
289     data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
290     error_callback: &mut dyn FnMut(StreamError),
291 ) {
292     loop {
293         match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
294             Some(ControlFlow::Break) => break,
295             Some(ControlFlow::Continue) => continue,
296             None => (),
297         }
298         let capture_client = match run_ctxt.stream.client_flow {
299             AudioClientFlow::Capture { capture_client } => capture_client,
300             _ => unreachable!(),
301         };
302         match process_input(
303             &mut run_ctxt.stream,
304             capture_client,
305             data_callback,
306             error_callback,
307         ) {
308             ControlFlow::Break => break,
309             ControlFlow::Continue => continue,
310         }
311     }
312 }
313 
run_output( mut run_ctxt: RunContext, data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), )314 fn run_output(
315     mut run_ctxt: RunContext,
316     data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
317     error_callback: &mut dyn FnMut(StreamError),
318 ) {
319     loop {
320         match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
321             Some(ControlFlow::Break) => break,
322             Some(ControlFlow::Continue) => continue,
323             None => (),
324         }
325         let render_client = match run_ctxt.stream.client_flow {
326             AudioClientFlow::Render { render_client } => render_client,
327             _ => unreachable!(),
328         };
329         match process_output(
330             &mut run_ctxt.stream,
331             render_client,
332             data_callback,
333             error_callback,
334         ) {
335             ControlFlow::Break => break,
336             ControlFlow::Continue => continue,
337         }
338     }
339 }
340 
341 enum ControlFlow {
342     Break,
343     Continue,
344 }
345 
process_commands_and_await_signal( run_context: &mut RunContext, error_callback: &mut dyn FnMut(StreamError), ) -> Option<ControlFlow>346 fn process_commands_and_await_signal(
347     run_context: &mut RunContext,
348     error_callback: &mut dyn FnMut(StreamError),
349 ) -> Option<ControlFlow> {
350     // Process queued commands.
351     match process_commands(run_context) {
352         Ok(true) => (),
353         Ok(false) => return Some(ControlFlow::Break),
354         Err(err) => {
355             error_callback(err);
356             return Some(ControlFlow::Break);
357         }
358     };
359 
360     // Wait for any of the handles to be signalled.
361     let handle_idx = match wait_for_handle_signal(&run_context.handles) {
362         Ok(idx) => idx,
363         Err(err) => {
364             error_callback(err.into());
365             return Some(ControlFlow::Break);
366         }
367     };
368 
369     // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
370     // order for us to pick up the pending commands. Otherwise, a stream needs data.
371     if handle_idx == 0 {
372         return Some(ControlFlow::Continue);
373     }
374 
375     None
376 }
377 
378 // The loop for processing pending input data.
process_input( stream: &StreamInner, capture_client: *mut audioclient::IAudioCaptureClient, data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), ) -> ControlFlow379 fn process_input(
380     stream: &StreamInner,
381     capture_client: *mut audioclient::IAudioCaptureClient,
382     data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
383     error_callback: &mut dyn FnMut(StreamError),
384 ) -> ControlFlow {
385     let mut frames_available = 0;
386     unsafe {
387         // Get the available data in the shared buffer.
388         let mut buffer: *mut BYTE = ptr::null_mut();
389         let mut flags = mem::MaybeUninit::uninit();
390         loop {
391             let hresult = (*capture_client).GetNextPacketSize(&mut frames_available);
392             if let Err(err) = stream_error_from_hresult(hresult) {
393                 error_callback(err);
394                 return ControlFlow::Break;
395             }
396             if frames_available == 0 {
397                 return ControlFlow::Continue;
398             }
399             let mut qpc_position: UINT64 = 0;
400             let hresult = (*capture_client).GetBuffer(
401                 &mut buffer,
402                 &mut frames_available,
403                 flags.as_mut_ptr(),
404                 ptr::null_mut(),
405                 &mut qpc_position,
406             );
407 
408             // TODO: Can this happen?
409             if hresult == AUDCLNT_S_BUFFER_EMPTY {
410                 continue;
411             } else if let Err(err) = stream_error_from_hresult(hresult) {
412                 error_callback(err);
413                 return ControlFlow::Break;
414             }
415 
416             debug_assert!(!buffer.is_null());
417 
418             let data = buffer as *mut ();
419             let len = frames_available as usize * stream.bytes_per_frame as usize
420                 / stream.sample_format.sample_size();
421             let data = Data::from_parts(data, len, stream.sample_format);
422 
423             // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
424             let timestamp = match input_timestamp(stream, qpc_position) {
425                 Ok(ts) => ts,
426                 Err(err) => {
427                     error_callback(err);
428                     return ControlFlow::Break;
429                 }
430             };
431             let info = InputCallbackInfo { timestamp };
432             data_callback(&data, &info);
433 
434             // Release the buffer.
435             let hresult = (*capture_client).ReleaseBuffer(frames_available);
436             if let Err(err) = stream_error_from_hresult(hresult) {
437                 error_callback(err);
438                 return ControlFlow::Break;
439             }
440         }
441     }
442 }
443 
444 // The loop for writing output data.
process_output( stream: &StreamInner, render_client: *mut audioclient::IAudioRenderClient, data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), ) -> ControlFlow445 fn process_output(
446     stream: &StreamInner,
447     render_client: *mut audioclient::IAudioRenderClient,
448     data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
449     error_callback: &mut dyn FnMut(StreamError),
450 ) -> ControlFlow {
451     // The number of frames available for writing.
452     let frames_available = match get_available_frames(&stream) {
453         Ok(0) => return ControlFlow::Continue, // TODO: Can this happen?
454         Ok(n) => n,
455         Err(err) => {
456             error_callback(err);
457             return ControlFlow::Break;
458         }
459     };
460 
461     unsafe {
462         let mut buffer: *mut BYTE = ptr::null_mut();
463         let hresult = (*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _);
464 
465         if let Err(err) = stream_error_from_hresult(hresult) {
466             error_callback(err);
467             return ControlFlow::Break;
468         }
469 
470         debug_assert!(!buffer.is_null());
471 
472         let data = buffer as *mut ();
473         let len = frames_available as usize * stream.bytes_per_frame as usize
474             / stream.sample_format.sample_size();
475         let mut data = Data::from_parts(data, len, stream.sample_format);
476         let sample_rate = stream.config.sample_rate;
477         let timestamp = match output_timestamp(stream, frames_available, sample_rate) {
478             Ok(ts) => ts,
479             Err(err) => {
480                 error_callback(err);
481                 return ControlFlow::Break;
482             }
483         };
484         let info = OutputCallbackInfo { timestamp };
485         data_callback(&mut data, &info);
486 
487         let hresult = (*render_client).ReleaseBuffer(frames_available as u32, 0);
488         if let Err(err) = stream_error_from_hresult(hresult) {
489             error_callback(err);
490             return ControlFlow::Break;
491         }
492     }
493 
494     ControlFlow::Continue
495 }
496 
497 /// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration498 fn frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration {
499     let secsf = frames as f64 / rate.0 as f64;
500     let secs = secsf as u64;
501     let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
502     std::time::Duration::new(secs, nanos)
503 }
504 
505 /// Use the stream's `IAudioClock` to produce the current stream instant.
506 ///
507 /// Uses the QPC position produced via the `GetPosition` method.
stream_instant(stream: &StreamInner) -> Result<crate::StreamInstant, StreamError>508 fn stream_instant(stream: &StreamInner) -> Result<crate::StreamInstant, StreamError> {
509     let mut position: UINT64 = 0;
510     let mut qpc_position: UINT64 = 0;
511     let res = unsafe { (*stream.audio_clock).GetPosition(&mut position, &mut qpc_position) };
512     stream_error_from_hresult(res)?;
513     // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
514     let qpc_nanos = qpc_position as i128 * 100;
515     let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos)
516         .expect("performance counter out of range of `StreamInstant` representation");
517     Ok(instant)
518 }
519 
520 /// Produce the input stream timestamp.
521 ///
522 /// `buffer_qpc_position` is the `qpc_position` returned via the `GetBuffer` call on the capture
523 /// client. It represents the instant at which the first sample of the retrieved buffer was
524 /// captured.
input_timestamp( stream: &StreamInner, buffer_qpc_position: UINT64, ) -> Result<crate::InputStreamTimestamp, StreamError>525 fn input_timestamp(
526     stream: &StreamInner,
527     buffer_qpc_position: UINT64,
528 ) -> Result<crate::InputStreamTimestamp, StreamError> {
529     // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
530     let qpc_nanos = buffer_qpc_position as i128 * 100;
531     let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos)
532         .expect("performance counter out of range of `StreamInstant` representation");
533     let callback = stream_instant(stream)?;
534     Ok(crate::InputStreamTimestamp { capture, callback })
535 }
536 
537 /// Produce the output stream timestamp.
538 ///
539 /// `frames_available` is the number of frames available for writing as reported by subtracting the
540 /// result of `GetCurrentPadding` from the maximum buffer size.
541 ///
542 /// `sample_rate` is the rate at which audio frames are processed by the device.
543 ///
544 /// TODO: The returned `playback` is an estimate that assumes audio is delivered immediately after
545 /// `frames_available` are consumed. The reality is that there is likely a tiny amount of latency
546 /// after this, but not sure how to determine this.
output_timestamp( stream: &StreamInner, frames_available: u32, sample_rate: crate::SampleRate, ) -> Result<crate::OutputStreamTimestamp, StreamError>547 fn output_timestamp(
548     stream: &StreamInner,
549     frames_available: u32,
550     sample_rate: crate::SampleRate,
551 ) -> Result<crate::OutputStreamTimestamp, StreamError> {
552     let callback = stream_instant(stream)?;
553     let buffer_duration = frames_to_duration(frames_available, sample_rate);
554     let playback = callback
555         .add(buffer_duration)
556         .expect("`playback` occurs beyond representation supported by `StreamInstant`");
557     Ok(crate::OutputStreamTimestamp { callback, playback })
558 }
559