1 // Copyright © 2017 Mozilla Foundation
2 //
3 // This program is made available under an ISC-style license. See the
4 // accompanying file LICENSE for details
5
6 #[cfg(target_os = "linux")]
7 use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
8 use audioipc::codec::LengthDelimitedCodec;
9 use audioipc::frame::{framed, Framed};
10 use audioipc::messages::{
11 CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
12 DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
13 StreamInitParams, StreamParams,
14 };
15 use audioipc::platformhandle_passing::FramedWithPlatformHandles;
16 use audioipc::rpc;
17 use audioipc::shm::SharedMem;
18 use audioipc::{MessageStream, PlatformHandle};
19 use cubeb_core as cubeb;
20 use cubeb_core::ffi;
21 use futures::future::{self, FutureResult};
22 use futures::sync::oneshot;
23 use futures::Future;
24 use std::convert::From;
25 use std::ffi::CStr;
26 use std::mem::size_of;
27 use std::os::raw::{c_long, c_void};
28 use std::rc::Rc;
29 use std::sync::atomic::{AtomicUsize, Ordering};
30 use std::{cell::RefCell, sync::Mutex};
31 use std::{panic, slice};
32 use tokio::reactor;
33 use tokio::runtime::current_thread;
34
35 use crate::errors::*;
36
error(error: cubeb::Error) -> ClientMessage37 fn error(error: cubeb::Error) -> ClientMessage {
38 ClientMessage::Error(error.raw_code())
39 }
40
41 struct CubebDeviceCollectionManager {
42 servers: Mutex<Vec<Rc<RefCell<CubebServerCallbacks>>>>,
43 }
44
45 impl CubebDeviceCollectionManager {
new() -> CubebDeviceCollectionManager46 fn new() -> CubebDeviceCollectionManager {
47 CubebDeviceCollectionManager {
48 servers: Mutex::new(Vec::new()),
49 }
50 }
51
register( &mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()>52 fn register(
53 &mut self,
54 context: &cubeb::Context,
55 server: &Rc<RefCell<CubebServerCallbacks>>,
56 devtype: cubeb::DeviceType,
57 ) -> cubeb::Result<()> {
58 let mut servers = self.servers.lock().unwrap();
59 if servers.is_empty() {
60 self.internal_register(context, true)?;
61 }
62 server.borrow_mut().devtype.insert(devtype);
63 if !servers.iter().any(|s| Rc::ptr_eq(s, server)) {
64 servers.push(server.clone());
65 }
66 Ok(())
67 }
68
unregister( &mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()>69 fn unregister(
70 &mut self,
71 context: &cubeb::Context,
72 server: &Rc<RefCell<CubebServerCallbacks>>,
73 devtype: cubeb::DeviceType,
74 ) -> cubeb::Result<()> {
75 let mut servers = self.servers.lock().unwrap();
76 server.borrow_mut().devtype.remove(devtype);
77 if server.borrow().devtype.is_empty() {
78 servers.retain(|s| !Rc::ptr_eq(&s, server));
79 }
80 if servers.is_empty() {
81 self.internal_register(context, false)?;
82 }
83 Ok(())
84 }
85
internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()>86 fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
87 let user_ptr = if enable {
88 self as *const CubebDeviceCollectionManager as *mut c_void
89 } else {
90 std::ptr::null_mut()
91 };
92 for &(dir, cb) in &[
93 (
94 cubeb::DeviceType::INPUT,
95 device_collection_changed_input_cb_c as _,
96 ),
97 (
98 cubeb::DeviceType::OUTPUT,
99 device_collection_changed_output_cb_c as _,
100 ),
101 ] {
102 unsafe {
103 context.register_device_collection_changed(
104 dir,
105 if enable { Some(cb) } else { None },
106 user_ptr,
107 )?;
108 }
109 }
110 Ok(())
111 }
112
113 // Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state.
device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type)114 unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
115 let servers = self.servers.lock().unwrap();
116 servers.iter().for_each(|server| {
117 if server
118 .borrow()
119 .devtype
120 .contains(cubeb::DeviceType::from_bits_truncate(device_type))
121 {
122 server
123 .borrow_mut()
124 .device_collection_changed_callback(device_type)
125 }
126 });
127 }
128 }
129
130 struct DevIdMap {
131 devices: Vec<usize>,
132 }
133
134 // A cubeb_devid is an opaque type which may be implemented with a stable
135 // pointer in a cubeb backend. cubeb_devids received remotely must be
136 // validated before use, so DevIdMap provides a simple 1:1 mapping between a
137 // cubeb_devid and an IPC-transportable value suitable for use as a unique
138 // handle.
139 impl DevIdMap {
new() -> DevIdMap140 fn new() -> DevIdMap {
141 let mut d = DevIdMap {
142 devices: Vec::with_capacity(32),
143 };
144 // A null cubeb_devid is used for selecting the default device.
145 // Pre-populate the mapping with 0 -> 0 to handle nulls.
146 d.devices.push(0);
147 d
148 }
149
150 // Given a cubeb_devid, return a unique stable value suitable for use
151 // over IPC.
make_handle(&mut self, devid: usize) -> usize152 fn make_handle(&mut self, devid: usize) -> usize {
153 if let Some(i) = self.devices.iter().position(|&d| d == devid) {
154 return i;
155 }
156 self.devices.push(devid);
157 self.devices.len() - 1
158 }
159
160 // Given a handle produced by `make_handle`, return the associated
161 // cubeb_devid. Invalid handles result in a panic.
handle_to_id(&self, handle: usize) -> usize162 fn handle_to_id(&self, handle: usize) -> usize {
163 self.devices[handle]
164 }
165 }
166
167 struct CubebContextState {
168 context: cubeb::Result<cubeb::Context>,
169 manager: CubebDeviceCollectionManager,
170 }
171
172 thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = RefCell::new(None));
173
cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context>174 fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
175 let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
176 let context_name = Some(params.context_name.as_c_str());
177 let backend_name = params.backend_name.as_deref();
178 let r = cubeb::Context::init(context_name, backend_name);
179 r.map_err(|e| {
180 info!("cubeb::Context::init failed r={:?}", e);
181 e
182 })
183 }
184
with_local_context<T, F>(f: F) -> T where F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,185 fn with_local_context<T, F>(f: F) -> T
186 where
187 F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
188 {
189 CONTEXT_KEY.with(|k| {
190 let mut state = k.borrow_mut();
191 if state.is_none() {
192 *state = Some(CubebContextState {
193 context: cubeb_init_from_context_params(),
194 manager: CubebDeviceCollectionManager::new(),
195 });
196 }
197 let CubebContextState { context, manager } = state.as_mut().unwrap();
198 // Always reattempt to initialize cubeb, OS config may have changed.
199 if context.is_err() {
200 *context = cubeb_init_from_context_params();
201 }
202 f(context, manager)
203 })
204 }
205
206 struct DeviceCollectionClient;
207
208 impl rpc::Client for DeviceCollectionClient {
209 type Request = DeviceCollectionReq;
210 type Response = DeviceCollectionResp;
211 type Transport =
212 Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
213 }
214
215 struct CallbackClient;
216
217 impl rpc::Client for CallbackClient {
218 type Request = CallbackReq;
219 type Response = CallbackResp;
220 type Transport =
221 Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
222 }
223
224 struct ServerStreamCallbacks {
225 /// Size of input frame in bytes
226 input_frame_size: u16,
227 /// Size of output frame in bytes
228 output_frame_size: u16,
229 /// Shared memory buffer for sending input data to client
230 input_shm: Option<SharedMem>,
231 /// Shared memory buffer for receiving output data from client
232 output_shm: Option<SharedMem>,
233 /// RPC interface to callback server running in client
234 rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
235 }
236
237 impl ServerStreamCallbacks {
data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize238 fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
239 trace!(
240 "Stream data callback: {} {} {}",
241 nframes,
242 input.len(),
243 output.len()
244 );
245
246 unsafe {
247 if let Some(shm) = &mut self.input_shm {
248 shm.get_mut_slice(input.len())
249 .unwrap()
250 .copy_from_slice(input);
251 }
252 }
253
254 let r = self
255 .rpc
256 .call(CallbackReq::Data {
257 nframes,
258 input_frame_size: self.input_frame_size as usize,
259 output_frame_size: self.output_frame_size as usize,
260 })
261 .wait();
262
263 match r {
264 Ok(CallbackResp::Data(frames)) => {
265 if frames >= 0 {
266 let nbytes = frames as usize * self.output_frame_size as usize;
267 trace!("Reslice output to {}", nbytes);
268 unsafe {
269 if let Some(shm) = &self.output_shm {
270 output[..nbytes].copy_from_slice(shm.get_slice(nbytes).unwrap());
271 }
272 }
273 }
274 frames
275 }
276 _ => {
277 debug!("Unexpected message {:?} during data_callback", r);
278 // TODO: Return a CUBEB_ERROR result here once
279 // https://github.com/kinetiknz/cubeb/issues/553 is
280 // fixed.
281 0
282 }
283 }
284 }
285
state_callback(&mut self, state: cubeb::State)286 fn state_callback(&mut self, state: cubeb::State) {
287 trace!("Stream state callback: {:?}", state);
288 let r = self.rpc.call(CallbackReq::State(state.into())).wait();
289 match r {
290 Ok(CallbackResp::State) => {}
291 _ => {
292 debug!("Unexpected message {:?} during state callback", r);
293 }
294 }
295 }
296
device_change_callback(&mut self)297 fn device_change_callback(&mut self) {
298 trace!("Stream device change callback");
299 let r = self.rpc.call(CallbackReq::DeviceChange).wait();
300 match r {
301 Ok(CallbackResp::DeviceChange) => {}
302 _ => {
303 debug!("Unexpected message {:?} during device change callback", r);
304 }
305 }
306 }
307 }
308
309 static SHM_ID: AtomicUsize = AtomicUsize::new(0);
310
311 // Generate a temporary shm_id fragment that is unique to the process. This
312 // path is used temporarily to create a shm segment, which is then
313 // immediately deleted from the filesystem while retaining handles to the
314 // shm to be shared between the server and client.
get_shm_id() -> String315 fn get_shm_id() -> String {
316 format!(
317 "cubeb-shm-{}-{}",
318 std::process::id(),
319 SHM_ID.fetch_add(1, Ordering::SeqCst)
320 )
321 }
322
323 struct ServerStream {
324 stream: Option<cubeb::Stream>,
325 cbs: Box<ServerStreamCallbacks>,
326 }
327
328 impl Drop for ServerStream {
drop(&mut self)329 fn drop(&mut self) {
330 // `stream` *must* be dropped before `cbs`.
331 drop(self.stream.take());
332 }
333 }
334
335 type StreamSlab = slab::Slab<ServerStream>;
336
337 struct CubebServerCallbacks {
338 rpc: rpc::ClientProxy<DeviceCollectionReq, DeviceCollectionResp>,
339 devtype: cubeb::DeviceType,
340 }
341
342 impl CubebServerCallbacks {
device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type)343 fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) {
344 // TODO: Assert device_type is in devtype.
345 debug!(
346 "Sending device collection ({:?}) changed event",
347 device_type
348 );
349 let _ = self
350 .rpc
351 .call(DeviceCollectionReq::DeviceChange(device_type))
352 .wait();
353 }
354 }
355
356 pub struct CubebServer {
357 handle: current_thread::Handle,
358 streams: StreamSlab,
359 remote_pid: Option<u32>,
360 cbs: Option<Rc<RefCell<CubebServerCallbacks>>>,
361 devidmap: DevIdMap,
362 }
363
364 impl rpc::Server for CubebServer {
365 type Request = ServerMessage;
366 type Response = ClientMessage;
367 type Future = FutureResult<Self::Response, ()>;
368 type Transport = FramedWithPlatformHandles<
369 audioipc::AsyncMessageStream,
370 LengthDelimitedCodec<Self::Response, Self::Request>,
371 >;
372
process(&mut self, req: Self::Request) -> Self::Future373 fn process(&mut self, req: Self::Request) -> Self::Future {
374 if let ServerMessage::ClientConnect(pid) = req {
375 self.remote_pid = Some(pid);
376 }
377 let resp = with_local_context(|context, manager| match *context {
378 Err(_) => error(cubeb::Error::error()),
379 Ok(ref context) => self.process_msg(context, manager, &req),
380 });
381 future::ok(resp)
382 }
383 }
384
385 // Debugging for BMO 1594216/1612044.
386 macro_rules! try_stream {
387 ($self:expr, $stm_tok:expr) => {
388 if $self.streams.contains($stm_tok) {
389 $self.streams[$stm_tok]
390 .stream
391 .as_mut()
392 .expect("uninitialized stream")
393 } else {
394 error!(
395 "{}:{}:{} - Stream({}): invalid token",
396 file!(),
397 line!(),
398 column!(),
399 $stm_tok
400 );
401 return error(cubeb::Error::invalid_parameter());
402 }
403 };
404 }
405
406 impl CubebServer {
new(handle: current_thread::Handle) -> Self407 pub fn new(handle: current_thread::Handle) -> Self {
408 CubebServer {
409 handle,
410 streams: StreamSlab::new(),
411 remote_pid: None,
412 cbs: None,
413 devidmap: DevIdMap::new(),
414 }
415 }
416
417 // Process a request coming from the client.
process_msg( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, msg: &ServerMessage, ) -> ClientMessage418 fn process_msg(
419 &mut self,
420 context: &cubeb::Context,
421 manager: &mut CubebDeviceCollectionManager,
422 msg: &ServerMessage,
423 ) -> ClientMessage {
424 let resp: ClientMessage = match *msg {
425 ServerMessage::ClientConnect(_) => {
426 // remote_pid is set before cubeb initialization, just verify here.
427 assert!(self.remote_pid.is_some());
428 ClientMessage::ClientConnected
429 }
430
431 ServerMessage::ClientDisconnect => {
432 // TODO:
433 //self.connection.client_disconnect();
434 ClientMessage::ClientDisconnected
435 }
436
437 ServerMessage::ContextGetBackendId => {
438 ClientMessage::ContextBackendId(context.backend_id().to_string())
439 }
440
441 ServerMessage::ContextGetMaxChannelCount => context
442 .max_channel_count()
443 .map(ClientMessage::ContextMaxChannelCount)
444 .unwrap_or_else(error),
445
446 ServerMessage::ContextGetMinLatency(ref params) => {
447 let format = cubeb::SampleFormat::from(params.format);
448 let layout = cubeb::ChannelLayout::from(params.layout);
449
450 let params = cubeb::StreamParamsBuilder::new()
451 .format(format)
452 .rate(params.rate)
453 .channels(params.channels)
454 .layout(layout)
455 .take();
456
457 context
458 .min_latency(¶ms)
459 .map(ClientMessage::ContextMinLatency)
460 .unwrap_or_else(error)
461 }
462
463 ServerMessage::ContextGetPreferredSampleRate => context
464 .preferred_sample_rate()
465 .map(ClientMessage::ContextPreferredSampleRate)
466 .unwrap_or_else(error),
467
468 ServerMessage::ContextGetDeviceEnumeration(device_type) => context
469 .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
470 .map(|devices| {
471 let v: Vec<DeviceInfo> = devices
472 .iter()
473 .map(|i| {
474 let mut tmp: DeviceInfo = i.as_ref().into();
475 // Replace each cubeb_devid with a unique handle suitable for IPC.
476 tmp.devid = self.devidmap.make_handle(tmp.devid);
477 tmp
478 })
479 .collect();
480 ClientMessage::ContextEnumeratedDevices(v)
481 })
482 .unwrap_or_else(error),
483
484 ServerMessage::StreamCreate(ref params) => self
485 .process_stream_create(params)
486 .unwrap_or_else(|_| error(cubeb::Error::error())),
487
488 ServerMessage::StreamInit(stm_tok, ref params) => self
489 .process_stream_init(context, stm_tok, params)
490 .unwrap_or_else(|_| error(cubeb::Error::error())),
491
492 ServerMessage::StreamDestroy(stm_tok) => {
493 if self.streams.contains(stm_tok) {
494 debug!("Unregistering stream {:?}", stm_tok);
495 self.streams.remove(stm_tok);
496 } else {
497 // Debugging for BMO 1594216/1612044.
498 error!("StreamDestroy({}): invalid token", stm_tok);
499 return error(cubeb::Error::invalid_parameter());
500 }
501 ClientMessage::StreamDestroyed
502 }
503
504 ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
505 .start()
506 .map(|_| ClientMessage::StreamStarted)
507 .unwrap_or_else(error),
508
509 ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
510 .stop()
511 .map(|_| ClientMessage::StreamStopped)
512 .unwrap_or_else(error),
513
514 ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
515 .position()
516 .map(ClientMessage::StreamPosition)
517 .unwrap_or_else(error),
518
519 ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
520 .latency()
521 .map(ClientMessage::StreamLatency)
522 .unwrap_or_else(error),
523
524 ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
525 .input_latency()
526 .map(ClientMessage::StreamInputLatency)
527 .unwrap_or_else(error),
528
529 ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
530 .set_volume(volume)
531 .map(|_| ClientMessage::StreamVolumeSet)
532 .unwrap_or_else(error),
533
534 ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
535 .set_name(name)
536 .map(|_| ClientMessage::StreamNameSet)
537 .unwrap_or_else(error),
538
539 ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
540 .current_device()
541 .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
542 .unwrap_or_else(error),
543
544 ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
545 try_stream!(self, stm_tok)
546 .register_device_changed_callback(if enable {
547 Some(device_change_cb_c)
548 } else {
549 None
550 })
551 .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
552 .unwrap_or_else(error)
553 }
554
555 ServerMessage::ContextSetupDeviceCollectionCallback => {
556 if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() {
557 debug!(
558 "Created device collection RPC pair: {:?}-{:?}",
559 ipc_server, ipc_client
560 );
561
562 // This code is currently running on the Client/Server RPC
563 // handling thread. We need to move the registration of the
564 // bind_client to the callback RPC handling thread. This is
565 // done by spawning a future on `handle`.
566 let (tx, rx) = oneshot::channel();
567 self.handle
568 .spawn(futures::future::lazy(move || {
569 let handle = reactor::Handle::default();
570 let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
571 let transport = framed(stream, Default::default());
572 let rpc = rpc::bind_client::<DeviceCollectionClient>(transport);
573 drop(tx.send(rpc));
574 Ok(())
575 }))
576 .expect("Failed to spawn DeviceCollectionClient");
577
578 // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
579 // need one here. Send some dummy handles over for the other side to discard.
580 let (dummy1, dummy2) =
581 MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair");
582 if let Ok(rpc) = rx.wait() {
583 self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks {
584 rpc,
585 devtype: cubeb::DeviceType::empty(),
586 })));
587 let fds = RegisterDeviceCollectionChanged {
588 platform_handles: [
589 PlatformHandle::from(ipc_client),
590 PlatformHandle::from(dummy1),
591 PlatformHandle::from(dummy2),
592 ],
593 target_pid: self.remote_pid.unwrap(),
594 };
595
596 ClientMessage::ContextSetupDeviceCollectionCallback(fds)
597 } else {
598 warn!("Failed to setup RPC client");
599 error(cubeb::Error::error())
600 }
601 } else {
602 warn!("Failed to create RPC pair");
603 error(cubeb::Error::error())
604 }
605 }
606
607 ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
608 .process_register_device_collection_changed(
609 context,
610 manager,
611 cubeb::DeviceType::from_bits_truncate(device_type),
612 enable,
613 )
614 .unwrap_or_else(error),
615
616 #[cfg(target_os = "linux")]
617 ServerMessage::PromoteThreadToRealTime(thread_info) => {
618 let info = RtPriorityThreadInfo::deserialize(thread_info);
619 match promote_thread_to_real_time(info, 0, 48000) {
620 Ok(_) => {
621 info!("Promotion of content process thread to real-time OK");
622 }
623 Err(_) => {
624 warn!("Promotion of content process thread to real-time error");
625 }
626 }
627 ClientMessage::ThreadPromoted
628 }
629 };
630
631 trace!("process_msg: req={:?}, resp={:?}", msg, resp);
632
633 resp
634 }
635
process_register_device_collection_changed( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, devtype: cubeb::DeviceType, enable: bool, ) -> cubeb::Result<ClientMessage>636 fn process_register_device_collection_changed(
637 &mut self,
638 context: &cubeb::Context,
639 manager: &mut CubebDeviceCollectionManager,
640 devtype: cubeb::DeviceType,
641 enable: bool,
642 ) -> cubeb::Result<ClientMessage> {
643 if devtype == cubeb::DeviceType::UNKNOWN {
644 return Err(cubeb::Error::invalid_parameter());
645 }
646
647 assert!(self.cbs.is_some());
648 let cbs = self.cbs.as_ref().unwrap();
649
650 if enable {
651 manager.register(context, cbs, devtype)
652 } else {
653 manager.unregister(context, cbs, devtype)
654 }
655 .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
656 }
657
658 // Stream create is special, so it's been separated from process_msg.
process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage>659 fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
660 fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
661 params
662 .map(|p| {
663 let format = p.format.into();
664 let sample_size = match format {
665 cubeb::SampleFormat::S16LE
666 | cubeb::SampleFormat::S16BE
667 | cubeb::SampleFormat::S16NE => 2,
668 cubeb::SampleFormat::Float32LE
669 | cubeb::SampleFormat::Float32BE
670 | cubeb::SampleFormat::Float32NE => 4,
671 };
672 let channel_count = p.channels as u16;
673 sample_size * channel_count
674 })
675 .unwrap_or(0u16)
676 }
677
678 // Create the callback handling struct which is attached the cubeb stream.
679 let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
680 let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
681
682 let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?;
683 debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client);
684 let shm_id = get_shm_id();
685 let (input_shm, input_file) =
686 SharedMem::new(&format!("{}-input", shm_id), audioipc::SHM_AREA_SIZE)?;
687 let (output_shm, output_file) =
688 SharedMem::new(&format!("{}-output", shm_id), audioipc::SHM_AREA_SIZE)?;
689
690 // This code is currently running on the Client/Server RPC
691 // handling thread. We need to move the registration of the
692 // bind_client to the callback RPC handling thread. This is
693 // done by spawning a future on `handle`.
694 let (tx, rx) = oneshot::channel();
695 self.handle
696 .spawn(futures::future::lazy(move || {
697 let handle = reactor::Handle::default();
698 let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
699 let transport = framed(stream, Default::default());
700 let rpc = rpc::bind_client::<CallbackClient>(transport);
701 drop(tx.send(rpc));
702 Ok(())
703 }))
704 .expect("Failed to spawn CallbackClient");
705
706 let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
707 Ok(rpc) => rpc,
708 Err(_) => bail!("Failed to create callback rpc."),
709 };
710
711 // TODO: The lowest comms layer expects exactly 3 PlatformHandles, so we always configure both sides of the shm.
712 // ServerStreamCallbacks only needs the active shm, so drop any unused shm now.
713 let input_shm = params.input_stream_params.and(Some(input_shm));
714 let output_shm = params.output_stream_params.and(Some(output_shm));
715
716 let cbs = Box::new(ServerStreamCallbacks {
717 input_frame_size,
718 output_frame_size,
719 input_shm,
720 output_shm,
721 rpc,
722 });
723
724 let entry = self.streams.vacant_entry();
725 let key = entry.key();
726 debug!("Registering stream {:?}", key);
727
728 entry.insert(ServerStream { stream: None, cbs });
729
730 Ok(ClientMessage::StreamCreated(StreamCreate {
731 token: key,
732 platform_handles: [PlatformHandle::from(ipc_client), input_file, output_file],
733 target_pid: self.remote_pid.unwrap(),
734 }))
735 }
736
737 // Stream init is special, so it's been separated from process_msg.
process_stream_init( &mut self, context: &cubeb::Context, stm_tok: usize, params: &StreamInitParams, ) -> Result<ClientMessage>738 fn process_stream_init(
739 &mut self,
740 context: &cubeb::Context,
741 stm_tok: usize,
742 params: &StreamInitParams,
743 ) -> Result<ClientMessage> {
744 // Create cubeb stream from params
745 let stream_name = params
746 .stream_name
747 .as_ref()
748 .and_then(|name| CStr::from_bytes_with_nul(name).ok());
749
750 // Map IPC handle back to cubeb_devid.
751 let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
752 let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
753 cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
754 });
755
756 // Map IPC handle back to cubeb_devid.
757 let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
758 let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
759 cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
760 });
761
762 let latency = params.latency_frames;
763
764 let server_stream = &mut self.streams[stm_tok];
765 assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
766 let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
767
768 let stream = unsafe {
769 let stream = context.stream_init(
770 stream_name,
771 input_device,
772 input_stream_params,
773 output_device,
774 output_stream_params,
775 latency,
776 Some(data_cb_c),
777 Some(state_cb_c),
778 user_ptr,
779 );
780 match stream {
781 Ok(stream) => stream,
782 Err(e) => {
783 debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
784 self.streams.remove(stm_tok);
785 return Err(e.into());
786 }
787 }
788 };
789
790 server_stream.stream = Some(stream);
791
792 Ok(ClientMessage::StreamInitialized)
793 }
794 }
795
796 // C callable callbacks
data_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, input_buffer: *const c_void, output_buffer: *mut c_void, nframes: c_long, ) -> c_long797 unsafe extern "C" fn data_cb_c(
798 _: *mut ffi::cubeb_stream,
799 user_ptr: *mut c_void,
800 input_buffer: *const c_void,
801 output_buffer: *mut c_void,
802 nframes: c_long,
803 ) -> c_long {
804 let ok = panic::catch_unwind(|| {
805 let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
806 let input = if input_buffer.is_null() {
807 &[]
808 } else {
809 let nbytes = nframes * c_long::from(cbs.input_frame_size);
810 slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
811 };
812 let output: &mut [u8] = if output_buffer.is_null() {
813 &mut []
814 } else {
815 let nbytes = nframes * c_long::from(cbs.output_frame_size);
816 slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
817 };
818 cbs.data_callback(input, output, nframes as isize) as c_long
819 });
820 // TODO: Return a CUBEB_ERROR result here once
821 // https://github.com/kinetiknz/cubeb/issues/553 is fixed.
822 ok.unwrap_or(0)
823 }
824
state_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, state: ffi::cubeb_state, )825 unsafe extern "C" fn state_cb_c(
826 _: *mut ffi::cubeb_stream,
827 user_ptr: *mut c_void,
828 state: ffi::cubeb_state,
829 ) {
830 let ok = panic::catch_unwind(|| {
831 let state = cubeb::State::from(state);
832 let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
833 cbs.state_callback(state);
834 });
835 ok.expect("State callback panicked");
836 }
837
device_change_cb_c(user_ptr: *mut c_void)838 unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
839 let ok = panic::catch_unwind(|| {
840 let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
841 cbs.device_change_callback();
842 });
843 ok.expect("Device change callback panicked");
844 }
845
device_collection_changed_input_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, )846 unsafe extern "C" fn device_collection_changed_input_cb_c(
847 _: *mut ffi::cubeb,
848 user_ptr: *mut c_void,
849 ) {
850 let ok = panic::catch_unwind(|| {
851 let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
852 manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
853 });
854 ok.expect("Collection changed (input) callback panicked");
855 }
856
device_collection_changed_output_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, )857 unsafe extern "C" fn device_collection_changed_output_cb_c(
858 _: *mut ffi::cubeb,
859 user_ptr: *mut c_void,
860 ) {
861 let ok = panic::catch_unwind(|| {
862 let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
863 manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
864 });
865 ok.expect("Collection changed (output) callback panicked");
866 }
867