1 // Copyright © 2017 Mozilla Foundation
2 //
3 // This program is made available under an ISC-style license.  See the
4 // accompanying file LICENSE for details
5 
6 #[cfg(target_os = "linux")]
7 use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
8 use audioipc::codec::LengthDelimitedCodec;
9 use audioipc::frame::{framed, Framed};
10 use audioipc::messages::{
11     CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
12     DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
13     StreamInitParams, StreamParams,
14 };
15 use audioipc::platformhandle_passing::FramedWithPlatformHandles;
16 use audioipc::rpc;
17 use audioipc::shm::SharedMem;
18 use audioipc::{MessageStream, PlatformHandle};
19 use cubeb_core as cubeb;
20 use cubeb_core::ffi;
21 use futures::future::{self, FutureResult};
22 use futures::sync::oneshot;
23 use futures::Future;
24 use std::convert::From;
25 use std::ffi::CStr;
26 use std::mem::size_of;
27 use std::os::raw::{c_long, c_void};
28 use std::rc::Rc;
29 use std::sync::atomic::{AtomicUsize, Ordering};
30 use std::{cell::RefCell, sync::Mutex};
31 use std::{panic, slice};
32 use tokio::reactor;
33 use tokio::runtime::current_thread;
34 
35 use crate::errors::*;
36 
error(error: cubeb::Error) -> ClientMessage37 fn error(error: cubeb::Error) -> ClientMessage {
38     ClientMessage::Error(error.raw_code())
39 }
40 
41 struct CubebDeviceCollectionManager {
42     servers: Mutex<Vec<Rc<RefCell<CubebServerCallbacks>>>>,
43 }
44 
45 impl CubebDeviceCollectionManager {
new() -> CubebDeviceCollectionManager46     fn new() -> CubebDeviceCollectionManager {
47         CubebDeviceCollectionManager {
48             servers: Mutex::new(Vec::new()),
49         }
50     }
51 
register( &mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()>52     fn register(
53         &mut self,
54         context: &cubeb::Context,
55         server: &Rc<RefCell<CubebServerCallbacks>>,
56         devtype: cubeb::DeviceType,
57     ) -> cubeb::Result<()> {
58         let mut servers = self.servers.lock().unwrap();
59         if servers.is_empty() {
60             self.internal_register(context, true)?;
61         }
62         server.borrow_mut().devtype.insert(devtype);
63         if !servers.iter().any(|s| Rc::ptr_eq(s, server)) {
64             servers.push(server.clone());
65         }
66         Ok(())
67     }
68 
unregister( &mut self, context: &cubeb::Context, server: &Rc<RefCell<CubebServerCallbacks>>, devtype: cubeb::DeviceType, ) -> cubeb::Result<()>69     fn unregister(
70         &mut self,
71         context: &cubeb::Context,
72         server: &Rc<RefCell<CubebServerCallbacks>>,
73         devtype: cubeb::DeviceType,
74     ) -> cubeb::Result<()> {
75         let mut servers = self.servers.lock().unwrap();
76         server.borrow_mut().devtype.remove(devtype);
77         if server.borrow().devtype.is_empty() {
78             servers.retain(|s| !Rc::ptr_eq(&s, server));
79         }
80         if servers.is_empty() {
81             self.internal_register(context, false)?;
82         }
83         Ok(())
84     }
85 
internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()>86     fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
87         let user_ptr = if enable {
88             self as *const CubebDeviceCollectionManager as *mut c_void
89         } else {
90             std::ptr::null_mut()
91         };
92         for &(dir, cb) in &[
93             (
94                 cubeb::DeviceType::INPUT,
95                 device_collection_changed_input_cb_c as _,
96             ),
97             (
98                 cubeb::DeviceType::OUTPUT,
99                 device_collection_changed_output_cb_c as _,
100             ),
101         ] {
102             unsafe {
103                 context.register_device_collection_changed(
104                     dir,
105                     if enable { Some(cb) } else { None },
106                     user_ptr,
107                 )?;
108             }
109         }
110         Ok(())
111     }
112 
113     // Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state.
device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type)114     unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
115         let servers = self.servers.lock().unwrap();
116         servers.iter().for_each(|server| {
117             if server
118                 .borrow()
119                 .devtype
120                 .contains(cubeb::DeviceType::from_bits_truncate(device_type))
121             {
122                 server
123                     .borrow_mut()
124                     .device_collection_changed_callback(device_type)
125             }
126         });
127     }
128 }
129 
130 struct DevIdMap {
131     devices: Vec<usize>,
132 }
133 
134 // A cubeb_devid is an opaque type which may be implemented with a stable
135 // pointer in a cubeb backend.  cubeb_devids received remotely must be
136 // validated before use, so DevIdMap provides a simple 1:1 mapping between a
137 // cubeb_devid and an IPC-transportable value suitable for use as a unique
138 // handle.
139 impl DevIdMap {
new() -> DevIdMap140     fn new() -> DevIdMap {
141         let mut d = DevIdMap {
142             devices: Vec::with_capacity(32),
143         };
144         // A null cubeb_devid is used for selecting the default device.
145         // Pre-populate the mapping with 0 -> 0 to handle nulls.
146         d.devices.push(0);
147         d
148     }
149 
150     // Given a cubeb_devid, return a unique stable value suitable for use
151     // over IPC.
make_handle(&mut self, devid: usize) -> usize152     fn make_handle(&mut self, devid: usize) -> usize {
153         if let Some(i) = self.devices.iter().position(|&d| d == devid) {
154             return i;
155         }
156         self.devices.push(devid);
157         self.devices.len() - 1
158     }
159 
160     // Given a handle produced by `make_handle`, return the associated
161     // cubeb_devid.  Invalid handles result in a panic.
handle_to_id(&self, handle: usize) -> usize162     fn handle_to_id(&self, handle: usize) -> usize {
163         self.devices[handle]
164     }
165 }
166 
167 struct CubebContextState {
168     context: cubeb::Result<cubeb::Context>,
169     manager: CubebDeviceCollectionManager,
170 }
171 
172 thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = RefCell::new(None));
173 
cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context>174 fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
175     let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
176     let context_name = Some(params.context_name.as_c_str());
177     let backend_name = params.backend_name.as_deref();
178     let r = cubeb::Context::init(context_name, backend_name);
179     r.map_err(|e| {
180         info!("cubeb::Context::init failed r={:?}", e);
181         e
182     })
183 }
184 
with_local_context<T, F>(f: F) -> T where F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,185 fn with_local_context<T, F>(f: F) -> T
186 where
187     F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
188 {
189     CONTEXT_KEY.with(|k| {
190         let mut state = k.borrow_mut();
191         if state.is_none() {
192             *state = Some(CubebContextState {
193                 context: cubeb_init_from_context_params(),
194                 manager: CubebDeviceCollectionManager::new(),
195             });
196         }
197         let CubebContextState { context, manager } = state.as_mut().unwrap();
198         // Always reattempt to initialize cubeb, OS config may have changed.
199         if context.is_err() {
200             *context = cubeb_init_from_context_params();
201         }
202         f(context, manager)
203     })
204 }
205 
206 struct DeviceCollectionClient;
207 
208 impl rpc::Client for DeviceCollectionClient {
209     type Request = DeviceCollectionReq;
210     type Response = DeviceCollectionResp;
211     type Transport =
212         Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
213 }
214 
215 struct CallbackClient;
216 
217 impl rpc::Client for CallbackClient {
218     type Request = CallbackReq;
219     type Response = CallbackResp;
220     type Transport =
221         Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
222 }
223 
224 struct ServerStreamCallbacks {
225     /// Size of input frame in bytes
226     input_frame_size: u16,
227     /// Size of output frame in bytes
228     output_frame_size: u16,
229     /// Shared memory buffer for sending input data to client
230     input_shm: Option<SharedMem>,
231     /// Shared memory buffer for receiving output data from client
232     output_shm: Option<SharedMem>,
233     /// RPC interface to callback server running in client
234     rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
235 }
236 
237 impl ServerStreamCallbacks {
data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize238     fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
239         trace!(
240             "Stream data callback: {} {} {}",
241             nframes,
242             input.len(),
243             output.len()
244         );
245 
246         unsafe {
247             if let Some(shm) = &mut self.input_shm {
248                 shm.get_mut_slice(input.len())
249                     .unwrap()
250                     .copy_from_slice(input);
251             }
252         }
253 
254         let r = self
255             .rpc
256             .call(CallbackReq::Data {
257                 nframes,
258                 input_frame_size: self.input_frame_size as usize,
259                 output_frame_size: self.output_frame_size as usize,
260             })
261             .wait();
262 
263         match r {
264             Ok(CallbackResp::Data(frames)) => {
265                 if frames >= 0 {
266                     let nbytes = frames as usize * self.output_frame_size as usize;
267                     trace!("Reslice output to {}", nbytes);
268                     unsafe {
269                         if let Some(shm) = &self.output_shm {
270                             output[..nbytes].copy_from_slice(shm.get_slice(nbytes).unwrap());
271                         }
272                     }
273                 }
274                 frames
275             }
276             _ => {
277                 debug!("Unexpected message {:?} during data_callback", r);
278                 // TODO: Return a CUBEB_ERROR result here once
279                 // https://github.com/kinetiknz/cubeb/issues/553 is
280                 // fixed.
281                 0
282             }
283         }
284     }
285 
state_callback(&mut self, state: cubeb::State)286     fn state_callback(&mut self, state: cubeb::State) {
287         trace!("Stream state callback: {:?}", state);
288         let r = self.rpc.call(CallbackReq::State(state.into())).wait();
289         match r {
290             Ok(CallbackResp::State) => {}
291             _ => {
292                 debug!("Unexpected message {:?} during state callback", r);
293             }
294         }
295     }
296 
device_change_callback(&mut self)297     fn device_change_callback(&mut self) {
298         trace!("Stream device change callback");
299         let r = self.rpc.call(CallbackReq::DeviceChange).wait();
300         match r {
301             Ok(CallbackResp::DeviceChange) => {}
302             _ => {
303                 debug!("Unexpected message {:?} during device change callback", r);
304             }
305         }
306     }
307 }
308 
309 static SHM_ID: AtomicUsize = AtomicUsize::new(0);
310 
311 // Generate a temporary shm_id fragment that is unique to the process.  This
312 // path is used temporarily to create a shm segment, which is then
313 // immediately deleted from the filesystem while retaining handles to the
314 // shm to be shared between the server and client.
get_shm_id() -> String315 fn get_shm_id() -> String {
316     format!(
317         "cubeb-shm-{}-{}",
318         std::process::id(),
319         SHM_ID.fetch_add(1, Ordering::SeqCst)
320     )
321 }
322 
323 struct ServerStream {
324     stream: Option<cubeb::Stream>,
325     cbs: Box<ServerStreamCallbacks>,
326 }
327 
328 impl Drop for ServerStream {
drop(&mut self)329     fn drop(&mut self) {
330         // `stream` *must* be dropped before `cbs`.
331         drop(self.stream.take());
332     }
333 }
334 
335 type StreamSlab = slab::Slab<ServerStream>;
336 
337 struct CubebServerCallbacks {
338     rpc: rpc::ClientProxy<DeviceCollectionReq, DeviceCollectionResp>,
339     devtype: cubeb::DeviceType,
340 }
341 
342 impl CubebServerCallbacks {
device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type)343     fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) {
344         // TODO: Assert device_type is in devtype.
345         debug!(
346             "Sending device collection ({:?}) changed event",
347             device_type
348         );
349         let _ = self
350             .rpc
351             .call(DeviceCollectionReq::DeviceChange(device_type))
352             .wait();
353     }
354 }
355 
356 pub struct CubebServer {
357     handle: current_thread::Handle,
358     streams: StreamSlab,
359     remote_pid: Option<u32>,
360     cbs: Option<Rc<RefCell<CubebServerCallbacks>>>,
361     devidmap: DevIdMap,
362 }
363 
364 impl rpc::Server for CubebServer {
365     type Request = ServerMessage;
366     type Response = ClientMessage;
367     type Future = FutureResult<Self::Response, ()>;
368     type Transport = FramedWithPlatformHandles<
369         audioipc::AsyncMessageStream,
370         LengthDelimitedCodec<Self::Response, Self::Request>,
371     >;
372 
process(&mut self, req: Self::Request) -> Self::Future373     fn process(&mut self, req: Self::Request) -> Self::Future {
374         if let ServerMessage::ClientConnect(pid) = req {
375             self.remote_pid = Some(pid);
376         }
377         let resp = with_local_context(|context, manager| match *context {
378             Err(_) => error(cubeb::Error::error()),
379             Ok(ref context) => self.process_msg(context, manager, &req),
380         });
381         future::ok(resp)
382     }
383 }
384 
385 // Debugging for BMO 1594216/1612044.
386 macro_rules! try_stream {
387     ($self:expr, $stm_tok:expr) => {
388         if $self.streams.contains($stm_tok) {
389             $self.streams[$stm_tok]
390                 .stream
391                 .as_mut()
392                 .expect("uninitialized stream")
393         } else {
394             error!(
395                 "{}:{}:{} - Stream({}): invalid token",
396                 file!(),
397                 line!(),
398                 column!(),
399                 $stm_tok
400             );
401             return error(cubeb::Error::invalid_parameter());
402         }
403     };
404 }
405 
406 impl CubebServer {
new(handle: current_thread::Handle) -> Self407     pub fn new(handle: current_thread::Handle) -> Self {
408         CubebServer {
409             handle,
410             streams: StreamSlab::new(),
411             remote_pid: None,
412             cbs: None,
413             devidmap: DevIdMap::new(),
414         }
415     }
416 
417     // Process a request coming from the client.
process_msg( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, msg: &ServerMessage, ) -> ClientMessage418     fn process_msg(
419         &mut self,
420         context: &cubeb::Context,
421         manager: &mut CubebDeviceCollectionManager,
422         msg: &ServerMessage,
423     ) -> ClientMessage {
424         let resp: ClientMessage = match *msg {
425             ServerMessage::ClientConnect(_) => {
426                 // remote_pid is set before cubeb initialization, just verify here.
427                 assert!(self.remote_pid.is_some());
428                 ClientMessage::ClientConnected
429             }
430 
431             ServerMessage::ClientDisconnect => {
432                 // TODO:
433                 //self.connection.client_disconnect();
434                 ClientMessage::ClientDisconnected
435             }
436 
437             ServerMessage::ContextGetBackendId => {
438                 ClientMessage::ContextBackendId(context.backend_id().to_string())
439             }
440 
441             ServerMessage::ContextGetMaxChannelCount => context
442                 .max_channel_count()
443                 .map(ClientMessage::ContextMaxChannelCount)
444                 .unwrap_or_else(error),
445 
446             ServerMessage::ContextGetMinLatency(ref params) => {
447                 let format = cubeb::SampleFormat::from(params.format);
448                 let layout = cubeb::ChannelLayout::from(params.layout);
449 
450                 let params = cubeb::StreamParamsBuilder::new()
451                     .format(format)
452                     .rate(params.rate)
453                     .channels(params.channels)
454                     .layout(layout)
455                     .take();
456 
457                 context
458                     .min_latency(&params)
459                     .map(ClientMessage::ContextMinLatency)
460                     .unwrap_or_else(error)
461             }
462 
463             ServerMessage::ContextGetPreferredSampleRate => context
464                 .preferred_sample_rate()
465                 .map(ClientMessage::ContextPreferredSampleRate)
466                 .unwrap_or_else(error),
467 
468             ServerMessage::ContextGetDeviceEnumeration(device_type) => context
469                 .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
470                 .map(|devices| {
471                     let v: Vec<DeviceInfo> = devices
472                         .iter()
473                         .map(|i| {
474                             let mut tmp: DeviceInfo = i.as_ref().into();
475                             // Replace each cubeb_devid with a unique handle suitable for IPC.
476                             tmp.devid = self.devidmap.make_handle(tmp.devid);
477                             tmp
478                         })
479                         .collect();
480                     ClientMessage::ContextEnumeratedDevices(v)
481                 })
482                 .unwrap_or_else(error),
483 
484             ServerMessage::StreamCreate(ref params) => self
485                 .process_stream_create(params)
486                 .unwrap_or_else(|_| error(cubeb::Error::error())),
487 
488             ServerMessage::StreamInit(stm_tok, ref params) => self
489                 .process_stream_init(context, stm_tok, params)
490                 .unwrap_or_else(|_| error(cubeb::Error::error())),
491 
492             ServerMessage::StreamDestroy(stm_tok) => {
493                 if self.streams.contains(stm_tok) {
494                     debug!("Unregistering stream {:?}", stm_tok);
495                     self.streams.remove(stm_tok);
496                 } else {
497                     // Debugging for BMO 1594216/1612044.
498                     error!("StreamDestroy({}): invalid token", stm_tok);
499                     return error(cubeb::Error::invalid_parameter());
500                 }
501                 ClientMessage::StreamDestroyed
502             }
503 
504             ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
505                 .start()
506                 .map(|_| ClientMessage::StreamStarted)
507                 .unwrap_or_else(error),
508 
509             ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
510                 .stop()
511                 .map(|_| ClientMessage::StreamStopped)
512                 .unwrap_or_else(error),
513 
514             ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
515                 .position()
516                 .map(ClientMessage::StreamPosition)
517                 .unwrap_or_else(error),
518 
519             ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
520                 .latency()
521                 .map(ClientMessage::StreamLatency)
522                 .unwrap_or_else(error),
523 
524             ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
525                 .input_latency()
526                 .map(ClientMessage::StreamInputLatency)
527                 .unwrap_or_else(error),
528 
529             ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
530                 .set_volume(volume)
531                 .map(|_| ClientMessage::StreamVolumeSet)
532                 .unwrap_or_else(error),
533 
534             ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
535                 .set_name(name)
536                 .map(|_| ClientMessage::StreamNameSet)
537                 .unwrap_or_else(error),
538 
539             ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
540                 .current_device()
541                 .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
542                 .unwrap_or_else(error),
543 
544             ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
545                 try_stream!(self, stm_tok)
546                     .register_device_changed_callback(if enable {
547                         Some(device_change_cb_c)
548                     } else {
549                         None
550                     })
551                     .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
552                     .unwrap_or_else(error)
553             }
554 
555             ServerMessage::ContextSetupDeviceCollectionCallback => {
556                 if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() {
557                     debug!(
558                         "Created device collection RPC pair: {:?}-{:?}",
559                         ipc_server, ipc_client
560                     );
561 
562                     // This code is currently running on the Client/Server RPC
563                     // handling thread.  We need to move the registration of the
564                     // bind_client to the callback RPC handling thread.  This is
565                     // done by spawning a future on `handle`.
566                     let (tx, rx) = oneshot::channel();
567                     self.handle
568                         .spawn(futures::future::lazy(move || {
569                             let handle = reactor::Handle::default();
570                             let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
571                             let transport = framed(stream, Default::default());
572                             let rpc = rpc::bind_client::<DeviceCollectionClient>(transport);
573                             drop(tx.send(rpc));
574                             Ok(())
575                         }))
576                         .expect("Failed to spawn DeviceCollectionClient");
577 
578                     // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
579                     // need one here.  Send some dummy handles over for the other side to discard.
580                     let (dummy1, dummy2) =
581                         MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair");
582                     if let Ok(rpc) = rx.wait() {
583                         self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks {
584                             rpc,
585                             devtype: cubeb::DeviceType::empty(),
586                         })));
587                         let fds = RegisterDeviceCollectionChanged {
588                             platform_handles: [
589                                 PlatformHandle::from(ipc_client),
590                                 PlatformHandle::from(dummy1),
591                                 PlatformHandle::from(dummy2),
592                             ],
593                             target_pid: self.remote_pid.unwrap(),
594                         };
595 
596                         ClientMessage::ContextSetupDeviceCollectionCallback(fds)
597                     } else {
598                         warn!("Failed to setup RPC client");
599                         error(cubeb::Error::error())
600                     }
601                 } else {
602                     warn!("Failed to create RPC pair");
603                     error(cubeb::Error::error())
604                 }
605             }
606 
607             ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
608                 .process_register_device_collection_changed(
609                     context,
610                     manager,
611                     cubeb::DeviceType::from_bits_truncate(device_type),
612                     enable,
613                 )
614                 .unwrap_or_else(error),
615 
616             #[cfg(target_os = "linux")]
617             ServerMessage::PromoteThreadToRealTime(thread_info) => {
618                 let info = RtPriorityThreadInfo::deserialize(thread_info);
619                 match promote_thread_to_real_time(info, 0, 48000) {
620                     Ok(_) => {
621                         info!("Promotion of content process thread to real-time OK");
622                     }
623                     Err(_) => {
624                         warn!("Promotion of content process thread to real-time error");
625                     }
626                 }
627                 ClientMessage::ThreadPromoted
628             }
629         };
630 
631         trace!("process_msg: req={:?}, resp={:?}", msg, resp);
632 
633         resp
634     }
635 
process_register_device_collection_changed( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, devtype: cubeb::DeviceType, enable: bool, ) -> cubeb::Result<ClientMessage>636     fn process_register_device_collection_changed(
637         &mut self,
638         context: &cubeb::Context,
639         manager: &mut CubebDeviceCollectionManager,
640         devtype: cubeb::DeviceType,
641         enable: bool,
642     ) -> cubeb::Result<ClientMessage> {
643         if devtype == cubeb::DeviceType::UNKNOWN {
644             return Err(cubeb::Error::invalid_parameter());
645         }
646 
647         assert!(self.cbs.is_some());
648         let cbs = self.cbs.as_ref().unwrap();
649 
650         if enable {
651             manager.register(context, cbs, devtype)
652         } else {
653             manager.unregister(context, cbs, devtype)
654         }
655         .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
656     }
657 
658     // Stream create is special, so it's been separated from process_msg.
process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage>659     fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
660         fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
661             params
662                 .map(|p| {
663                     let format = p.format.into();
664                     let sample_size = match format {
665                         cubeb::SampleFormat::S16LE
666                         | cubeb::SampleFormat::S16BE
667                         | cubeb::SampleFormat::S16NE => 2,
668                         cubeb::SampleFormat::Float32LE
669                         | cubeb::SampleFormat::Float32BE
670                         | cubeb::SampleFormat::Float32NE => 4,
671                     };
672                     let channel_count = p.channels as u16;
673                     sample_size * channel_count
674                 })
675                 .unwrap_or(0u16)
676         }
677 
678         // Create the callback handling struct which is attached the cubeb stream.
679         let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
680         let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
681 
682         let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?;
683         debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client);
684         let shm_id = get_shm_id();
685         let (input_shm, input_file) =
686             SharedMem::new(&format!("{}-input", shm_id), audioipc::SHM_AREA_SIZE)?;
687         let (output_shm, output_file) =
688             SharedMem::new(&format!("{}-output", shm_id), audioipc::SHM_AREA_SIZE)?;
689 
690         // This code is currently running on the Client/Server RPC
691         // handling thread.  We need to move the registration of the
692         // bind_client to the callback RPC handling thread.  This is
693         // done by spawning a future on `handle`.
694         let (tx, rx) = oneshot::channel();
695         self.handle
696             .spawn(futures::future::lazy(move || {
697                 let handle = reactor::Handle::default();
698                 let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
699                 let transport = framed(stream, Default::default());
700                 let rpc = rpc::bind_client::<CallbackClient>(transport);
701                 drop(tx.send(rpc));
702                 Ok(())
703             }))
704             .expect("Failed to spawn CallbackClient");
705 
706         let rpc: rpc::ClientProxy<CallbackReq, CallbackResp> = match rx.wait() {
707             Ok(rpc) => rpc,
708             Err(_) => bail!("Failed to create callback rpc."),
709         };
710 
711         // TODO: The lowest comms layer expects exactly 3 PlatformHandles, so we always configure both sides of the shm.
712         // ServerStreamCallbacks only needs the active shm, so drop any unused shm now.
713         let input_shm = params.input_stream_params.and(Some(input_shm));
714         let output_shm = params.output_stream_params.and(Some(output_shm));
715 
716         let cbs = Box::new(ServerStreamCallbacks {
717             input_frame_size,
718             output_frame_size,
719             input_shm,
720             output_shm,
721             rpc,
722         });
723 
724         let entry = self.streams.vacant_entry();
725         let key = entry.key();
726         debug!("Registering stream {:?}", key);
727 
728         entry.insert(ServerStream { stream: None, cbs });
729 
730         Ok(ClientMessage::StreamCreated(StreamCreate {
731             token: key,
732             platform_handles: [PlatformHandle::from(ipc_client), input_file, output_file],
733             target_pid: self.remote_pid.unwrap(),
734         }))
735     }
736 
737     // Stream init is special, so it's been separated from process_msg.
process_stream_init( &mut self, context: &cubeb::Context, stm_tok: usize, params: &StreamInitParams, ) -> Result<ClientMessage>738     fn process_stream_init(
739         &mut self,
740         context: &cubeb::Context,
741         stm_tok: usize,
742         params: &StreamInitParams,
743     ) -> Result<ClientMessage> {
744         // Create cubeb stream from params
745         let stream_name = params
746             .stream_name
747             .as_ref()
748             .and_then(|name| CStr::from_bytes_with_nul(name).ok());
749 
750         // Map IPC handle back to cubeb_devid.
751         let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
752         let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
753             cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
754         });
755 
756         // Map IPC handle back to cubeb_devid.
757         let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
758         let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
759             cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
760         });
761 
762         let latency = params.latency_frames;
763 
764         let server_stream = &mut self.streams[stm_tok];
765         assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
766         let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
767 
768         let stream = unsafe {
769             let stream = context.stream_init(
770                 stream_name,
771                 input_device,
772                 input_stream_params,
773                 output_device,
774                 output_stream_params,
775                 latency,
776                 Some(data_cb_c),
777                 Some(state_cb_c),
778                 user_ptr,
779             );
780             match stream {
781                 Ok(stream) => stream,
782                 Err(e) => {
783                     debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
784                     self.streams.remove(stm_tok);
785                     return Err(e.into());
786                 }
787             }
788         };
789 
790         server_stream.stream = Some(stream);
791 
792         Ok(ClientMessage::StreamInitialized)
793     }
794 }
795 
796 // C callable callbacks
data_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, input_buffer: *const c_void, output_buffer: *mut c_void, nframes: c_long, ) -> c_long797 unsafe extern "C" fn data_cb_c(
798     _: *mut ffi::cubeb_stream,
799     user_ptr: *mut c_void,
800     input_buffer: *const c_void,
801     output_buffer: *mut c_void,
802     nframes: c_long,
803 ) -> c_long {
804     let ok = panic::catch_unwind(|| {
805         let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
806         let input = if input_buffer.is_null() {
807             &[]
808         } else {
809             let nbytes = nframes * c_long::from(cbs.input_frame_size);
810             slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
811         };
812         let output: &mut [u8] = if output_buffer.is_null() {
813             &mut []
814         } else {
815             let nbytes = nframes * c_long::from(cbs.output_frame_size);
816             slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
817         };
818         cbs.data_callback(input, output, nframes as isize) as c_long
819     });
820     // TODO: Return a CUBEB_ERROR result here once
821     // https://github.com/kinetiknz/cubeb/issues/553 is fixed.
822     ok.unwrap_or(0)
823 }
824 
state_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, state: ffi::cubeb_state, )825 unsafe extern "C" fn state_cb_c(
826     _: *mut ffi::cubeb_stream,
827     user_ptr: *mut c_void,
828     state: ffi::cubeb_state,
829 ) {
830     let ok = panic::catch_unwind(|| {
831         let state = cubeb::State::from(state);
832         let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
833         cbs.state_callback(state);
834     });
835     ok.expect("State callback panicked");
836 }
837 
device_change_cb_c(user_ptr: *mut c_void)838 unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
839     let ok = panic::catch_unwind(|| {
840         let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
841         cbs.device_change_callback();
842     });
843     ok.expect("Device change callback panicked");
844 }
845 
device_collection_changed_input_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, )846 unsafe extern "C" fn device_collection_changed_input_cb_c(
847     _: *mut ffi::cubeb,
848     user_ptr: *mut c_void,
849 ) {
850     let ok = panic::catch_unwind(|| {
851         let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
852         manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
853     });
854     ok.expect("Collection changed (input) callback panicked");
855 }
856 
device_collection_changed_output_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, )857 unsafe extern "C" fn device_collection_changed_output_cb_c(
858     _: *mut ffi::cubeb,
859     user_ptr: *mut c_void,
860 ) {
861     let ok = panic::catch_unwind(|| {
862         let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
863         manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
864     });
865     ok.expect("Collection changed (output) callback panicked");
866 }
867