1 // Copyright © 2017 Mozilla Foundation
2 //
3 // This program is made available under an ISC-style license.  See the
4 // accompanying file LICENSE for details
5 
6 use crate::ClientContext;
7 use crate::{assert_not_in_callback, run_in_callback};
8 use audioipc::messages::StreamCreateParams;
9 use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
10 use audioipc::shm::SharedMem;
11 use audioipc::{rpccore, sys};
12 use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
13 use std::ffi::{CStr, CString};
14 use std::os::raw::c_void;
15 use std::ptr;
16 use std::sync::mpsc;
17 use std::sync::{Arc, Mutex};
18 
19 pub struct Device(ffi::cubeb_device);
20 
21 impl Drop for Device {
drop(&mut self)22     fn drop(&mut self) {
23         unsafe {
24             if !self.0.input_name.is_null() {
25                 let _ = CString::from_raw(self.0.input_name as *mut _);
26             }
27             if !self.0.output_name.is_null() {
28                 let _ = CString::from_raw(self.0.output_name as *mut _);
29             }
30         }
31     }
32 }
33 
34 // ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the
35 // common fields.
36 #[repr(C)]
37 #[derive(Debug)]
38 pub struct ClientStream<'ctx> {
39     // This must be a reference to Context for cubeb, cubeb accesses
40     // stream methods via stream->context->ops
41     context: &'ctx ClientContext,
42     user_ptr: *mut c_void,
43     token: usize,
44     device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
45     // Signals ClientStream that CallbackServer has dropped.
46     shutdown_rx: mpsc::Receiver<()>,
47 }
48 
49 struct CallbackServer {
50     shm: SharedMem,
51     duplex_input: Option<Vec<u8>>,
52     data_cb: ffi::cubeb_data_callback,
53     state_cb: ffi::cubeb_state_callback,
54     user_ptr: usize,
55     device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
56     // Signals ClientStream that CallbackServer has dropped.
57     _shutdown_tx: mpsc::Sender<()>,
58 }
59 
60 impl rpccore::Server for CallbackServer {
61     type ServerMessage = CallbackReq;
62     type ClientMessage = CallbackResp;
63 
process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage64     fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
65         match req {
66             CallbackReq::Data {
67                 nframes,
68                 input_frame_size,
69                 output_frame_size,
70             } => {
71                 trace!(
72                     "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}",
73                     nframes,
74                     input_frame_size,
75                     output_frame_size,
76                 );
77 
78                 let input_nbytes = nframes as usize * input_frame_size;
79                 let output_nbytes = nframes as usize * output_frame_size;
80 
81                 // Input and output reuse the same shmem backing.  Unfortunately, cubeb's data_callback isn't
82                 // specified in such a way that would require the callee to consume all of the input before
83                 // writing to the output (i.e., it is passed as two pointers that aren't expected to alias).
84                 // That means we need to copy the input here.
85                 if let Some(buf) = &mut self.duplex_input {
86                     assert!(input_nbytes > 0);
87                     assert!(buf.capacity() >= input_nbytes);
88                     unsafe {
89                         let input = self.shm.get_slice(input_nbytes).unwrap();
90                         ptr::copy_nonoverlapping(input.as_ptr(), buf.as_mut_ptr(), input.len());
91                     }
92                 }
93 
94                 run_in_callback(|| {
95                     let nframes = unsafe {
96                         self.data_cb.unwrap()(
97                             ptr::null_mut(), // https://github.com/kinetiknz/cubeb/issues/518
98                             self.user_ptr as *mut c_void,
99                             if let Some(buf) = &mut self.duplex_input {
100                                 buf.as_mut_ptr()
101                             } else {
102                                 self.shm.get_slice(input_nbytes).unwrap().as_ptr()
103                             } as *const _,
104                             self.shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr() as *mut _,
105                             nframes as _,
106                         )
107                     };
108 
109                     CallbackResp::Data(nframes as isize)
110                 })
111             }
112             CallbackReq::State(state) => {
113                 trace!("stream_thread: State Callback: {:?}", state);
114                 run_in_callback(|| unsafe {
115                     self.state_cb.unwrap()(ptr::null_mut(), self.user_ptr as *mut _, state);
116                 });
117 
118                 CallbackResp::State
119             }
120             CallbackReq::DeviceChange => {
121                 run_in_callback(|| {
122                     let cb = *self.device_change_cb.lock().unwrap();
123                     if let Some(cb) = cb {
124                         unsafe {
125                             cb(self.user_ptr as *mut _);
126                         }
127                     } else {
128                         warn!("DeviceChange received with null callback");
129                     }
130                 });
131 
132                 CallbackResp::DeviceChange
133             }
134         }
135     }
136 }
137 
138 impl<'ctx> ClientStream<'ctx> {
init( ctx: &'ctx ClientContext, init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result<Stream>139     fn init(
140         ctx: &'ctx ClientContext,
141         init_params: messages::StreamInitParams,
142         data_callback: ffi::cubeb_data_callback,
143         state_callback: ffi::cubeb_state_callback,
144         user_ptr: *mut c_void,
145     ) -> Result<Stream> {
146         assert_not_in_callback();
147 
148         let rpc = ctx.rpc();
149         let create_params = StreamCreateParams {
150             input_stream_params: init_params.input_stream_params,
151             output_stream_params: init_params.output_stream_params,
152         };
153         let mut data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;
154 
155         debug!(
156             "token = {}, handle = {:?} area_size = {:?}",
157             data.token, data.shm_handle, data.shm_area_size
158         );
159 
160         let shm =
161             match unsafe { SharedMem::from(data.shm_handle.take_handle(), data.shm_area_size) } {
162                 Ok(shm) => shm,
163                 Err(e) => {
164                     warn!(
165                         "SharedMem client mapping failed (size={}, err={:?})",
166                         data.shm_area_size, e
167                     );
168                     return Err(Error::default());
169                 }
170             };
171 
172         let duplex_input = if let (Some(_), Some(_)) = (
173             init_params.input_stream_params,
174             init_params.output_stream_params,
175         ) {
176             let mut duplex_input = Vec::new();
177             match duplex_input.try_reserve_exact(data.shm_area_size) {
178                 Ok(()) => Some(duplex_input),
179                 Err(e) => {
180                     warn!(
181                         "duplex_input allocation failed (size={}, err={:?})",
182                         data.shm_area_size, e
183                     );
184                     return Err(Error::default());
185                 }
186             }
187         } else {
188             None
189         };
190 
191         let mut stream =
192             send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized())?;
193         let stream = unsafe { sys::Pipe::from_raw_handle(stream.take_handle()) };
194 
195         let user_data = user_ptr as usize;
196 
197         let null_cb: ffi::cubeb_device_changed_callback = None;
198         let device_change_cb = Arc::new(Mutex::new(null_cb));
199 
200         let (_shutdown_tx, shutdown_rx) = mpsc::channel();
201 
202         let server = CallbackServer {
203             shm,
204             duplex_input,
205             data_cb: data_callback,
206             state_cb: state_callback,
207             user_ptr: user_data,
208             device_change_cb: device_change_cb.clone(),
209             _shutdown_tx,
210         };
211 
212         ctx.callback_handle()
213             .bind_server(server, stream)
214             .map_err(|_| Error::default())?;
215 
216         let stream = Box::into_raw(Box::new(ClientStream {
217             context: ctx,
218             user_ptr,
219             token: data.token,
220             device_change_cb,
221             shutdown_rx,
222         }));
223         Ok(unsafe { Stream::from_ptr(stream as *mut _) })
224     }
225 }
226 
227 impl Drop for ClientStream<'_> {
drop(&mut self)228     fn drop(&mut self) {
229         debug!("ClientStream drop");
230         let rpc = self.context.rpc();
231         let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
232         debug!("ClientStream drop - stream destroyed");
233         // Wait for CallbackServer to shutdown.  The remote server drops the RPC
234         // connection during StreamDestroy, which will cause CallbackServer to drop
235         // once the connection close is detected.  Dropping CallbackServer will
236         // cause the shutdown channel to error on recv, which we rely on to
237         // synchronize with CallbackServer dropping.
238         let _ = self.shutdown_rx.recv();
239         debug!("ClientStream dropped");
240     }
241 }
242 
243 impl StreamOps for ClientStream<'_> {
start(&mut self) -> Result<()>244     fn start(&mut self) -> Result<()> {
245         assert_not_in_callback();
246         let rpc = self.context.rpc();
247         send_recv!(rpc, StreamStart(self.token) => StreamStarted)
248     }
249 
stop(&mut self) -> Result<()>250     fn stop(&mut self) -> Result<()> {
251         assert_not_in_callback();
252         let rpc = self.context.rpc();
253         send_recv!(rpc, StreamStop(self.token) => StreamStopped)
254     }
255 
position(&mut self) -> Result<u64>256     fn position(&mut self) -> Result<u64> {
257         assert_not_in_callback();
258         let rpc = self.context.rpc();
259         send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
260     }
261 
latency(&mut self) -> Result<u32>262     fn latency(&mut self) -> Result<u32> {
263         assert_not_in_callback();
264         let rpc = self.context.rpc();
265         send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
266     }
267 
input_latency(&mut self) -> Result<u32>268     fn input_latency(&mut self) -> Result<u32> {
269         assert_not_in_callback();
270         let rpc = self.context.rpc();
271         send_recv!(rpc, StreamGetInputLatency(self.token) => StreamInputLatency())
272     }
273 
set_volume(&mut self, volume: f32) -> Result<()>274     fn set_volume(&mut self, volume: f32) -> Result<()> {
275         assert_not_in_callback();
276         let rpc = self.context.rpc();
277         send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
278     }
279 
set_name(&mut self, name: &CStr) -> Result<()>280     fn set_name(&mut self, name: &CStr) -> Result<()> {
281         assert_not_in_callback();
282         let rpc = self.context.rpc();
283         send_recv!(rpc, StreamSetName(self.token, name.to_owned()) => StreamNameSet)
284     }
285 
current_device(&mut self) -> Result<&DeviceRef>286     fn current_device(&mut self) -> Result<&DeviceRef> {
287         assert_not_in_callback();
288         let rpc = self.context.rpc();
289         match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
290             Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
291             Err(e) => Err(e),
292         }
293     }
294 
device_destroy(&mut self, device: &DeviceRef) -> Result<()>295     fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
296         assert_not_in_callback();
297         if device.as_ptr().is_null() {
298             Err(Error::error())
299         } else {
300             unsafe {
301                 let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
302             }
303             Ok(())
304         }
305     }
306 
register_device_changed_callback( &mut self, device_changed_callback: ffi::cubeb_device_changed_callback, ) -> Result<()>307     fn register_device_changed_callback(
308         &mut self,
309         device_changed_callback: ffi::cubeb_device_changed_callback,
310     ) -> Result<()> {
311         assert_not_in_callback();
312         let rpc = self.context.rpc();
313         let enable = device_changed_callback.is_some();
314         *self.device_change_cb.lock().unwrap() = device_changed_callback;
315         send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback)
316     }
317 }
318 
init( ctx: &ClientContext, init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result<Stream>319 pub fn init(
320     ctx: &ClientContext,
321     init_params: messages::StreamInitParams,
322     data_callback: ffi::cubeb_data_callback,
323     state_callback: ffi::cubeb_state_callback,
324     user_ptr: *mut c_void,
325 ) -> Result<Stream> {
326     let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?;
327     debug_assert_eq!(stm.user_ptr(), user_ptr);
328     Ok(stm)
329 }
330