1 use super::check_result;
2 use super::winapi::shared::basetsd::{UINT32, UINT64};
3 use super::winapi::shared::minwindef::{BYTE, FALSE, WORD};
4 use super::winapi::um::audioclient::{self, AUDCLNT_E_DEVICE_INVALIDATED, AUDCLNT_S_BUFFER_EMPTY};
5 use super::winapi::um::handleapi;
6 use super::winapi::um::synchapi;
7 use super::winapi::um::winbase;
8 use super::winapi::um::winnt;
9 use crate::traits::StreamTrait;
10 use crate::{
11 BackendSpecificError, Data, InputCallbackInfo, OutputCallbackInfo, PauseStreamError,
12 PlayStreamError, SampleFormat, StreamError,
13 };
14 use std::mem;
15 use std::ptr;
16 use std::sync::mpsc::{channel, Receiver, Sender};
17 use std::thread::{self, JoinHandle};
18
19 pub struct Stream {
20 /// The high-priority audio processing thread calling callbacks.
21 /// Option used for moving out in destructor.
22 ///
23 /// TODO: Actually set the thread priority.
24 thread: Option<JoinHandle<()>>,
25
26 // Commands processed by the `run()` method that is currently running.
27 // `pending_scheduled_event` must be signalled whenever a command is added here, so that it
28 // will get picked up.
29 commands: Sender<Command>,
30
31 // This event is signalled after a new entry is added to `commands`, so that the `run()`
32 // method can be notified.
33 pending_scheduled_event: winnt::HANDLE,
34 }
35
36 struct RunContext {
37 // Streams that have been created in this event loop.
38 stream: StreamInner,
39
40 // Handles corresponding to the `event` field of each element of `voices`. Must always be in
41 // sync with `voices`, except that the first element is always `pending_scheduled_event`.
42 handles: Vec<winnt::HANDLE>,
43
44 commands: Receiver<Command>,
45 }
46
47 // Once we start running the eventloop, the RunContext will not be moved.
48 unsafe impl Send for RunContext {}
49
50 pub enum Command {
51 PlayStream,
52 PauseStream,
53 Terminate,
54 }
55
56 pub enum AudioClientFlow {
57 Render {
58 render_client: *mut audioclient::IAudioRenderClient,
59 },
60 Capture {
61 capture_client: *mut audioclient::IAudioCaptureClient,
62 },
63 }
64
65 pub struct StreamInner {
66 pub audio_client: *mut audioclient::IAudioClient,
67 pub audio_clock: *mut audioclient::IAudioClock,
68 pub client_flow: AudioClientFlow,
69 // Event that is signalled by WASAPI whenever audio data must be written.
70 pub event: winnt::HANDLE,
71 // True if the stream is currently playing. False if paused.
72 pub playing: bool,
73 // Number of frames of audio data in the underlying buffer allocated by WASAPI.
74 pub max_frames_in_buffer: UINT32,
75 // Number of bytes that each frame occupies.
76 pub bytes_per_frame: WORD,
77 // The configuration with which the stream was created.
78 pub config: crate::StreamConfig,
79 // The sample format with which the stream was created.
80 pub sample_format: SampleFormat,
81 }
82
83 impl Stream {
new_input<D, E>( stream_inner: StreamInner, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,84 pub(crate) fn new_input<D, E>(
85 stream_inner: StreamInner,
86 mut data_callback: D,
87 mut error_callback: E,
88 ) -> Stream
89 where
90 D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
91 E: FnMut(StreamError) + Send + 'static,
92 {
93 let pending_scheduled_event =
94 unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
95 let (tx, rx) = channel();
96
97 let run_context = RunContext {
98 handles: vec![pending_scheduled_event, stream_inner.event],
99 stream: stream_inner,
100 commands: rx,
101 };
102
103 let thread = thread::Builder::new()
104 .name("cpal_wasapi_in".to_owned())
105 .spawn(move || run_input(run_context, &mut data_callback, &mut error_callback))
106 .unwrap();
107
108 Stream {
109 thread: Some(thread),
110 commands: tx,
111 pending_scheduled_event,
112 }
113 }
114
new_output<D, E>( stream_inner: StreamInner, mut data_callback: D, mut error_callback: E, ) -> Stream where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static,115 pub(crate) fn new_output<D, E>(
116 stream_inner: StreamInner,
117 mut data_callback: D,
118 mut error_callback: E,
119 ) -> Stream
120 where
121 D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
122 E: FnMut(StreamError) + Send + 'static,
123 {
124 let pending_scheduled_event =
125 unsafe { synchapi::CreateEventA(ptr::null_mut(), 0, 0, ptr::null()) };
126 let (tx, rx) = channel();
127
128 let run_context = RunContext {
129 handles: vec![pending_scheduled_event, stream_inner.event],
130 stream: stream_inner,
131 commands: rx,
132 };
133
134 let thread = thread::Builder::new()
135 .name("cpal_wasapi_out".to_owned())
136 .spawn(move || run_output(run_context, &mut data_callback, &mut error_callback))
137 .unwrap();
138
139 Stream {
140 thread: Some(thread),
141 commands: tx,
142 pending_scheduled_event,
143 }
144 }
145
146 #[inline]
push_command(&self, command: Command)147 fn push_command(&self, command: Command) {
148 // Sender generally outlives receiver, unless the device gets unplugged.
149 let _ = self.commands.send(command);
150 unsafe {
151 let result = synchapi::SetEvent(self.pending_scheduled_event);
152 assert_ne!(result, 0);
153 }
154 }
155 }
156
157 impl Drop for Stream {
158 #[inline]
drop(&mut self)159 fn drop(&mut self) {
160 self.push_command(Command::Terminate);
161 self.thread.take().unwrap().join().unwrap();
162 unsafe {
163 handleapi::CloseHandle(self.pending_scheduled_event);
164 }
165 }
166 }
167
168 impl StreamTrait for Stream {
play(&self) -> Result<(), PlayStreamError>169 fn play(&self) -> Result<(), PlayStreamError> {
170 self.push_command(Command::PlayStream);
171 Ok(())
172 }
pause(&self) -> Result<(), PauseStreamError>173 fn pause(&self) -> Result<(), PauseStreamError> {
174 self.push_command(Command::PauseStream);
175 Ok(())
176 }
177 }
178
179 impl Drop for AudioClientFlow {
drop(&mut self)180 fn drop(&mut self) {
181 unsafe {
182 match *self {
183 AudioClientFlow::Capture { capture_client } => (*capture_client).Release(),
184 AudioClientFlow::Render { render_client } => (*render_client).Release(),
185 };
186 }
187 }
188 }
189
190 impl Drop for StreamInner {
191 #[inline]
drop(&mut self)192 fn drop(&mut self) {
193 unsafe {
194 (*self.audio_client).Release();
195 (*self.audio_clock).Release();
196 handleapi::CloseHandle(self.event);
197 }
198 }
199 }
200
201 // Process any pending commands that are queued within the `RunContext`.
202 // Returns `true` if the loop should continue running, `false` if it should terminate.
process_commands(run_context: &mut RunContext) -> Result<bool, StreamError>203 fn process_commands(run_context: &mut RunContext) -> Result<bool, StreamError> {
204 // Process the pending commands.
205 for command in run_context.commands.try_iter() {
206 match command {
207 Command::PlayStream => {
208 if !run_context.stream.playing {
209 let hresult = unsafe { (*run_context.stream.audio_client).Start() };
210
211 if let Err(err) = stream_error_from_hresult(hresult) {
212 return Err(err);
213 }
214 run_context.stream.playing = true;
215 }
216 }
217 Command::PauseStream => {
218 if run_context.stream.playing {
219 let hresult = unsafe { (*run_context.stream.audio_client).Stop() };
220 if let Err(err) = stream_error_from_hresult(hresult) {
221 return Err(err);
222 }
223 run_context.stream.playing = false;
224 }
225 }
226 Command::Terminate => {
227 return Ok(false);
228 }
229 }
230 }
231
232 Ok(true)
233 }
234 // Wait for any of the given handles to be signalled.
235 //
236 // Returns the index of the `handle` that was signalled, or an `Err` if
237 // `WaitForMultipleObjectsEx` fails.
238 //
239 // This is called when the `run` thread is ready to wait for the next event. The
240 // next event might be some command submitted by the user (the first handle) or
241 // might indicate that one of the streams is ready to deliver or receive audio.
wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError>242 fn wait_for_handle_signal(handles: &[winnt::HANDLE]) -> Result<usize, BackendSpecificError> {
243 debug_assert!(handles.len() <= winnt::MAXIMUM_WAIT_OBJECTS as usize);
244 let result = unsafe {
245 synchapi::WaitForMultipleObjectsEx(
246 handles.len() as u32,
247 handles.as_ptr(),
248 FALSE, // Don't wait for all, just wait for the first
249 winbase::INFINITE, // TODO: allow setting a timeout
250 FALSE, // irrelevant parameter here
251 )
252 };
253 if result == winbase::WAIT_FAILED {
254 let err = unsafe { winapi::um::errhandlingapi::GetLastError() };
255 let description = format!("`WaitForMultipleObjectsEx failed: {}", err);
256 let err = BackendSpecificError { description };
257 return Err(err);
258 }
259 // Notifying the corresponding task handler.
260 let handle_idx = (result - winbase::WAIT_OBJECT_0) as usize;
261 Ok(handle_idx)
262 }
263
264 // Get the number of available frames that are available for writing/reading.
get_available_frames(stream: &StreamInner) -> Result<u32, StreamError>265 fn get_available_frames(stream: &StreamInner) -> Result<u32, StreamError> {
266 unsafe {
267 let mut padding = 0u32;
268 let hresult = (*stream.audio_client).GetCurrentPadding(&mut padding);
269 stream_error_from_hresult(hresult)?;
270 Ok(stream.max_frames_in_buffer - padding)
271 }
272 }
273
274 // Convert the given `HRESULT` into a `StreamError` if it does indicate an error.
stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError>275 fn stream_error_from_hresult(hresult: winnt::HRESULT) -> Result<(), StreamError> {
276 if hresult == AUDCLNT_E_DEVICE_INVALIDATED {
277 return Err(StreamError::DeviceNotAvailable);
278 }
279 if let Err(err) = check_result(hresult) {
280 let description = format!("{}", err);
281 let err = BackendSpecificError { description };
282 return Err(err.into());
283 }
284 Ok(())
285 }
286
run_input( mut run_ctxt: RunContext, data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), )287 fn run_input(
288 mut run_ctxt: RunContext,
289 data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
290 error_callback: &mut dyn FnMut(StreamError),
291 ) {
292 loop {
293 match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
294 Some(ControlFlow::Break) => break,
295 Some(ControlFlow::Continue) => continue,
296 None => (),
297 }
298 let capture_client = match run_ctxt.stream.client_flow {
299 AudioClientFlow::Capture { capture_client } => capture_client,
300 _ => unreachable!(),
301 };
302 match process_input(
303 &mut run_ctxt.stream,
304 capture_client,
305 data_callback,
306 error_callback,
307 ) {
308 ControlFlow::Break => break,
309 ControlFlow::Continue => continue,
310 }
311 }
312 }
313
run_output( mut run_ctxt: RunContext, data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), )314 fn run_output(
315 mut run_ctxt: RunContext,
316 data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
317 error_callback: &mut dyn FnMut(StreamError),
318 ) {
319 loop {
320 match process_commands_and_await_signal(&mut run_ctxt, error_callback) {
321 Some(ControlFlow::Break) => break,
322 Some(ControlFlow::Continue) => continue,
323 None => (),
324 }
325 let render_client = match run_ctxt.stream.client_flow {
326 AudioClientFlow::Render { render_client } => render_client,
327 _ => unreachable!(),
328 };
329 match process_output(
330 &mut run_ctxt.stream,
331 render_client,
332 data_callback,
333 error_callback,
334 ) {
335 ControlFlow::Break => break,
336 ControlFlow::Continue => continue,
337 }
338 }
339 }
340
341 enum ControlFlow {
342 Break,
343 Continue,
344 }
345
process_commands_and_await_signal( run_context: &mut RunContext, error_callback: &mut dyn FnMut(StreamError), ) -> Option<ControlFlow>346 fn process_commands_and_await_signal(
347 run_context: &mut RunContext,
348 error_callback: &mut dyn FnMut(StreamError),
349 ) -> Option<ControlFlow> {
350 // Process queued commands.
351 match process_commands(run_context) {
352 Ok(true) => (),
353 Ok(false) => return Some(ControlFlow::Break),
354 Err(err) => {
355 error_callback(err);
356 return Some(ControlFlow::Break);
357 }
358 };
359
360 // Wait for any of the handles to be signalled.
361 let handle_idx = match wait_for_handle_signal(&run_context.handles) {
362 Ok(idx) => idx,
363 Err(err) => {
364 error_callback(err.into());
365 return Some(ControlFlow::Break);
366 }
367 };
368
369 // If `handle_idx` is 0, then it's `pending_scheduled_event` that was signalled in
370 // order for us to pick up the pending commands. Otherwise, a stream needs data.
371 if handle_idx == 0 {
372 return Some(ControlFlow::Continue);
373 }
374
375 None
376 }
377
378 // The loop for processing pending input data.
process_input( stream: &StreamInner, capture_client: *mut audioclient::IAudioCaptureClient, data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), ) -> ControlFlow379 fn process_input(
380 stream: &StreamInner,
381 capture_client: *mut audioclient::IAudioCaptureClient,
382 data_callback: &mut dyn FnMut(&Data, &InputCallbackInfo),
383 error_callback: &mut dyn FnMut(StreamError),
384 ) -> ControlFlow {
385 let mut frames_available = 0;
386 unsafe {
387 // Get the available data in the shared buffer.
388 let mut buffer: *mut BYTE = ptr::null_mut();
389 let mut flags = mem::MaybeUninit::uninit();
390 loop {
391 let hresult = (*capture_client).GetNextPacketSize(&mut frames_available);
392 if let Err(err) = stream_error_from_hresult(hresult) {
393 error_callback(err);
394 return ControlFlow::Break;
395 }
396 if frames_available == 0 {
397 return ControlFlow::Continue;
398 }
399 let mut qpc_position: UINT64 = 0;
400 let hresult = (*capture_client).GetBuffer(
401 &mut buffer,
402 &mut frames_available,
403 flags.as_mut_ptr(),
404 ptr::null_mut(),
405 &mut qpc_position,
406 );
407
408 // TODO: Can this happen?
409 if hresult == AUDCLNT_S_BUFFER_EMPTY {
410 continue;
411 } else if let Err(err) = stream_error_from_hresult(hresult) {
412 error_callback(err);
413 return ControlFlow::Break;
414 }
415
416 debug_assert!(!buffer.is_null());
417
418 let data = buffer as *mut ();
419 let len = frames_available as usize * stream.bytes_per_frame as usize
420 / stream.sample_format.sample_size();
421 let data = Data::from_parts(data, len, stream.sample_format);
422
423 // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
424 let timestamp = match input_timestamp(stream, qpc_position) {
425 Ok(ts) => ts,
426 Err(err) => {
427 error_callback(err);
428 return ControlFlow::Break;
429 }
430 };
431 let info = InputCallbackInfo { timestamp };
432 data_callback(&data, &info);
433
434 // Release the buffer.
435 let hresult = (*capture_client).ReleaseBuffer(frames_available);
436 if let Err(err) = stream_error_from_hresult(hresult) {
437 error_callback(err);
438 return ControlFlow::Break;
439 }
440 }
441 }
442 }
443
444 // The loop for writing output data.
process_output( stream: &StreamInner, render_client: *mut audioclient::IAudioRenderClient, data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo), error_callback: &mut dyn FnMut(StreamError), ) -> ControlFlow445 fn process_output(
446 stream: &StreamInner,
447 render_client: *mut audioclient::IAudioRenderClient,
448 data_callback: &mut dyn FnMut(&mut Data, &OutputCallbackInfo),
449 error_callback: &mut dyn FnMut(StreamError),
450 ) -> ControlFlow {
451 // The number of frames available for writing.
452 let frames_available = match get_available_frames(&stream) {
453 Ok(0) => return ControlFlow::Continue, // TODO: Can this happen?
454 Ok(n) => n,
455 Err(err) => {
456 error_callback(err);
457 return ControlFlow::Break;
458 }
459 };
460
461 unsafe {
462 let mut buffer: *mut BYTE = ptr::null_mut();
463 let hresult = (*render_client).GetBuffer(frames_available, &mut buffer as *mut *mut _);
464
465 if let Err(err) = stream_error_from_hresult(hresult) {
466 error_callback(err);
467 return ControlFlow::Break;
468 }
469
470 debug_assert!(!buffer.is_null());
471
472 let data = buffer as *mut ();
473 let len = frames_available as usize * stream.bytes_per_frame as usize
474 / stream.sample_format.sample_size();
475 let mut data = Data::from_parts(data, len, stream.sample_format);
476 let sample_rate = stream.config.sample_rate;
477 let timestamp = match output_timestamp(stream, frames_available, sample_rate) {
478 Ok(ts) => ts,
479 Err(err) => {
480 error_callback(err);
481 return ControlFlow::Break;
482 }
483 };
484 let info = OutputCallbackInfo { timestamp };
485 data_callback(&mut data, &info);
486
487 let hresult = (*render_client).ReleaseBuffer(frames_available as u32, 0);
488 if let Err(err) = stream_error_from_hresult(hresult) {
489 error_callback(err);
490 return ControlFlow::Break;
491 }
492 }
493
494 ControlFlow::Continue
495 }
496
497 /// Convert the given duration in frames at the given sample rate to a `std::time::Duration`.
frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration498 fn frames_to_duration(frames: u32, rate: crate::SampleRate) -> std::time::Duration {
499 let secsf = frames as f64 / rate.0 as f64;
500 let secs = secsf as u64;
501 let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32;
502 std::time::Duration::new(secs, nanos)
503 }
504
505 /// Use the stream's `IAudioClock` to produce the current stream instant.
506 ///
507 /// Uses the QPC position produced via the `GetPosition` method.
stream_instant(stream: &StreamInner) -> Result<crate::StreamInstant, StreamError>508 fn stream_instant(stream: &StreamInner) -> Result<crate::StreamInstant, StreamError> {
509 let mut position: UINT64 = 0;
510 let mut qpc_position: UINT64 = 0;
511 let res = unsafe { (*stream.audio_clock).GetPosition(&mut position, &mut qpc_position) };
512 stream_error_from_hresult(res)?;
513 // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
514 let qpc_nanos = qpc_position as i128 * 100;
515 let instant = crate::StreamInstant::from_nanos_i128(qpc_nanos)
516 .expect("performance counter out of range of `StreamInstant` representation");
517 Ok(instant)
518 }
519
520 /// Produce the input stream timestamp.
521 ///
522 /// `buffer_qpc_position` is the `qpc_position` returned via the `GetBuffer` call on the capture
523 /// client. It represents the instant at which the first sample of the retrieved buffer was
524 /// captured.
input_timestamp( stream: &StreamInner, buffer_qpc_position: UINT64, ) -> Result<crate::InputStreamTimestamp, StreamError>525 fn input_timestamp(
526 stream: &StreamInner,
527 buffer_qpc_position: UINT64,
528 ) -> Result<crate::InputStreamTimestamp, StreamError> {
529 // The `qpc_position` is in 100 nanosecond units. Convert it to nanoseconds.
530 let qpc_nanos = buffer_qpc_position as i128 * 100;
531 let capture = crate::StreamInstant::from_nanos_i128(qpc_nanos)
532 .expect("performance counter out of range of `StreamInstant` representation");
533 let callback = stream_instant(stream)?;
534 Ok(crate::InputStreamTimestamp { capture, callback })
535 }
536
537 /// Produce the output stream timestamp.
538 ///
539 /// `frames_available` is the number of frames available for writing as reported by subtracting the
540 /// result of `GetCurrentPadding` from the maximum buffer size.
541 ///
542 /// `sample_rate` is the rate at which audio frames are processed by the device.
543 ///
544 /// TODO: The returned `playback` is an estimate that assumes audio is delivered immediately after
545 /// `frames_available` are consumed. The reality is that there is likely a tiny amount of latency
546 /// after this, but not sure how to determine this.
output_timestamp( stream: &StreamInner, frames_available: u32, sample_rate: crate::SampleRate, ) -> Result<crate::OutputStreamTimestamp, StreamError>547 fn output_timestamp(
548 stream: &StreamInner,
549 frames_available: u32,
550 sample_rate: crate::SampleRate,
551 ) -> Result<crate::OutputStreamTimestamp, StreamError> {
552 let callback = stream_instant(stream)?;
553 let buffer_duration = frames_to_duration(frames_available, sample_rate);
554 let playback = callback
555 .add(buffer_duration)
556 .expect("`playback` occurs beyond representation supported by `StreamInstant`");
557 Ok(crate::OutputStreamTimestamp { callback, playback })
558 }
559