1 // Copyright © 2017 Mozilla Foundation
2 //
3 // This program is made available under an ISC-style license. See the
4 // accompanying file LICENSE for details
5
6 use crate::ClientContext;
7 use crate::{assert_not_in_callback, run_in_callback};
8 use audioipc::messages::StreamCreateParams;
9 use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
10 use audioipc::shm::SharedMem;
11 use audioipc::{rpccore, sys};
12 use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
13 use std::ffi::{CStr, CString};
14 use std::os::raw::c_void;
15 use std::ptr;
16 use std::sync::mpsc;
17 use std::sync::{Arc, Mutex};
18
19 pub struct Device(ffi::cubeb_device);
20
21 impl Drop for Device {
drop(&mut self)22 fn drop(&mut self) {
23 unsafe {
24 if !self.0.input_name.is_null() {
25 let _ = CString::from_raw(self.0.input_name as *mut _);
26 }
27 if !self.0.output_name.is_null() {
28 let _ = CString::from_raw(self.0.output_name as *mut _);
29 }
30 }
31 }
32 }
33
34 // ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the
35 // common fields.
36 #[repr(C)]
37 #[derive(Debug)]
38 pub struct ClientStream<'ctx> {
39 // This must be a reference to Context for cubeb, cubeb accesses
40 // stream methods via stream->context->ops
41 context: &'ctx ClientContext,
42 user_ptr: *mut c_void,
43 token: usize,
44 device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
45 // Signals ClientStream that CallbackServer has dropped.
46 shutdown_rx: mpsc::Receiver<()>,
47 }
48
49 struct CallbackServer {
50 shm: SharedMem,
51 duplex_input: Option<Vec<u8>>,
52 data_cb: ffi::cubeb_data_callback,
53 state_cb: ffi::cubeb_state_callback,
54 user_ptr: usize,
55 device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
56 // Signals ClientStream that CallbackServer has dropped.
57 _shutdown_tx: mpsc::Sender<()>,
58 }
59
60 impl rpccore::Server for CallbackServer {
61 type ServerMessage = CallbackReq;
62 type ClientMessage = CallbackResp;
63
process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage64 fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
65 match req {
66 CallbackReq::Data {
67 nframes,
68 input_frame_size,
69 output_frame_size,
70 } => {
71 trace!(
72 "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}",
73 nframes,
74 input_frame_size,
75 output_frame_size,
76 );
77
78 let input_nbytes = nframes as usize * input_frame_size;
79 let output_nbytes = nframes as usize * output_frame_size;
80
81 // Input and output reuse the same shmem backing. Unfortunately, cubeb's data_callback isn't
82 // specified in such a way that would require the callee to consume all of the input before
83 // writing to the output (i.e., it is passed as two pointers that aren't expected to alias).
84 // That means we need to copy the input here.
85 if let Some(buf) = &mut self.duplex_input {
86 assert!(input_nbytes > 0);
87 assert!(buf.capacity() >= input_nbytes);
88 unsafe {
89 let input = self.shm.get_slice(input_nbytes).unwrap();
90 ptr::copy_nonoverlapping(input.as_ptr(), buf.as_mut_ptr(), input.len());
91 }
92 }
93
94 run_in_callback(|| {
95 let nframes = unsafe {
96 self.data_cb.unwrap()(
97 ptr::null_mut(), // https://github.com/kinetiknz/cubeb/issues/518
98 self.user_ptr as *mut c_void,
99 if let Some(buf) = &mut self.duplex_input {
100 buf.as_mut_ptr()
101 } else {
102 self.shm.get_slice(input_nbytes).unwrap().as_ptr()
103 } as *const _,
104 self.shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr() as *mut _,
105 nframes as _,
106 )
107 };
108
109 CallbackResp::Data(nframes as isize)
110 })
111 }
112 CallbackReq::State(state) => {
113 trace!("stream_thread: State Callback: {:?}", state);
114 run_in_callback(|| unsafe {
115 self.state_cb.unwrap()(ptr::null_mut(), self.user_ptr as *mut _, state);
116 });
117
118 CallbackResp::State
119 }
120 CallbackReq::DeviceChange => {
121 run_in_callback(|| {
122 let cb = *self.device_change_cb.lock().unwrap();
123 if let Some(cb) = cb {
124 unsafe {
125 cb(self.user_ptr as *mut _);
126 }
127 } else {
128 warn!("DeviceChange received with null callback");
129 }
130 });
131
132 CallbackResp::DeviceChange
133 }
134 }
135 }
136 }
137
138 impl<'ctx> ClientStream<'ctx> {
init( ctx: &'ctx ClientContext, init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result<Stream>139 fn init(
140 ctx: &'ctx ClientContext,
141 init_params: messages::StreamInitParams,
142 data_callback: ffi::cubeb_data_callback,
143 state_callback: ffi::cubeb_state_callback,
144 user_ptr: *mut c_void,
145 ) -> Result<Stream> {
146 assert_not_in_callback();
147
148 let rpc = ctx.rpc();
149 let create_params = StreamCreateParams {
150 input_stream_params: init_params.input_stream_params,
151 output_stream_params: init_params.output_stream_params,
152 };
153 let mut data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;
154
155 debug!(
156 "token = {}, handle = {:?} area_size = {:?}",
157 data.token, data.shm_handle, data.shm_area_size
158 );
159
160 let shm =
161 match unsafe { SharedMem::from(data.shm_handle.take_handle(), data.shm_area_size) } {
162 Ok(shm) => shm,
163 Err(e) => {
164 warn!(
165 "SharedMem client mapping failed (size={}, err={:?})",
166 data.shm_area_size, e
167 );
168 return Err(Error::default());
169 }
170 };
171
172 let duplex_input = if let (Some(_), Some(_)) = (
173 init_params.input_stream_params,
174 init_params.output_stream_params,
175 ) {
176 let mut duplex_input = Vec::new();
177 match duplex_input.try_reserve_exact(data.shm_area_size) {
178 Ok(()) => Some(duplex_input),
179 Err(e) => {
180 warn!(
181 "duplex_input allocation failed (size={}, err={:?})",
182 data.shm_area_size, e
183 );
184 return Err(Error::default());
185 }
186 }
187 } else {
188 None
189 };
190
191 let mut stream =
192 send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized())?;
193 let stream = unsafe { sys::Pipe::from_raw_handle(stream.take_handle()) };
194
195 let user_data = user_ptr as usize;
196
197 let null_cb: ffi::cubeb_device_changed_callback = None;
198 let device_change_cb = Arc::new(Mutex::new(null_cb));
199
200 let (_shutdown_tx, shutdown_rx) = mpsc::channel();
201
202 let server = CallbackServer {
203 shm,
204 duplex_input,
205 data_cb: data_callback,
206 state_cb: state_callback,
207 user_ptr: user_data,
208 device_change_cb: device_change_cb.clone(),
209 _shutdown_tx,
210 };
211
212 ctx.callback_handle()
213 .bind_server(server, stream)
214 .map_err(|_| Error::default())?;
215
216 let stream = Box::into_raw(Box::new(ClientStream {
217 context: ctx,
218 user_ptr,
219 token: data.token,
220 device_change_cb,
221 shutdown_rx,
222 }));
223 Ok(unsafe { Stream::from_ptr(stream as *mut _) })
224 }
225 }
226
227 impl Drop for ClientStream<'_> {
drop(&mut self)228 fn drop(&mut self) {
229 debug!("ClientStream drop");
230 let rpc = self.context.rpc();
231 let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
232 debug!("ClientStream drop - stream destroyed");
233 // Wait for CallbackServer to shutdown. The remote server drops the RPC
234 // connection during StreamDestroy, which will cause CallbackServer to drop
235 // once the connection close is detected. Dropping CallbackServer will
236 // cause the shutdown channel to error on recv, which we rely on to
237 // synchronize with CallbackServer dropping.
238 let _ = self.shutdown_rx.recv();
239 debug!("ClientStream dropped");
240 }
241 }
242
243 impl StreamOps for ClientStream<'_> {
start(&mut self) -> Result<()>244 fn start(&mut self) -> Result<()> {
245 assert_not_in_callback();
246 let rpc = self.context.rpc();
247 send_recv!(rpc, StreamStart(self.token) => StreamStarted)
248 }
249
stop(&mut self) -> Result<()>250 fn stop(&mut self) -> Result<()> {
251 assert_not_in_callback();
252 let rpc = self.context.rpc();
253 send_recv!(rpc, StreamStop(self.token) => StreamStopped)
254 }
255
position(&mut self) -> Result<u64>256 fn position(&mut self) -> Result<u64> {
257 assert_not_in_callback();
258 let rpc = self.context.rpc();
259 send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
260 }
261
latency(&mut self) -> Result<u32>262 fn latency(&mut self) -> Result<u32> {
263 assert_not_in_callback();
264 let rpc = self.context.rpc();
265 send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
266 }
267
input_latency(&mut self) -> Result<u32>268 fn input_latency(&mut self) -> Result<u32> {
269 assert_not_in_callback();
270 let rpc = self.context.rpc();
271 send_recv!(rpc, StreamGetInputLatency(self.token) => StreamInputLatency())
272 }
273
set_volume(&mut self, volume: f32) -> Result<()>274 fn set_volume(&mut self, volume: f32) -> Result<()> {
275 assert_not_in_callback();
276 let rpc = self.context.rpc();
277 send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
278 }
279
set_name(&mut self, name: &CStr) -> Result<()>280 fn set_name(&mut self, name: &CStr) -> Result<()> {
281 assert_not_in_callback();
282 let rpc = self.context.rpc();
283 send_recv!(rpc, StreamSetName(self.token, name.to_owned()) => StreamNameSet)
284 }
285
current_device(&mut self) -> Result<&DeviceRef>286 fn current_device(&mut self) -> Result<&DeviceRef> {
287 assert_not_in_callback();
288 let rpc = self.context.rpc();
289 match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
290 Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
291 Err(e) => Err(e),
292 }
293 }
294
device_destroy(&mut self, device: &DeviceRef) -> Result<()>295 fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
296 assert_not_in_callback();
297 if device.as_ptr().is_null() {
298 Err(Error::error())
299 } else {
300 unsafe {
301 let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
302 }
303 Ok(())
304 }
305 }
306
register_device_changed_callback( &mut self, device_changed_callback: ffi::cubeb_device_changed_callback, ) -> Result<()>307 fn register_device_changed_callback(
308 &mut self,
309 device_changed_callback: ffi::cubeb_device_changed_callback,
310 ) -> Result<()> {
311 assert_not_in_callback();
312 let rpc = self.context.rpc();
313 let enable = device_changed_callback.is_some();
314 *self.device_change_cb.lock().unwrap() = device_changed_callback;
315 send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback)
316 }
317 }
318
init( ctx: &ClientContext, init_params: messages::StreamInitParams, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result<Stream>319 pub fn init(
320 ctx: &ClientContext,
321 init_params: messages::StreamInitParams,
322 data_callback: ffi::cubeb_data_callback,
323 state_callback: ffi::cubeb_state_callback,
324 user_ptr: *mut c_void,
325 ) -> Result<Stream> {
326 let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?;
327 debug_assert_eq!(stm.user_ptr(), user_ptr);
328 Ok(stm)
329 }
330