1 // Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com> 2 // 3 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 4 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 5 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 6 // option. This file may not be copied, modified, or distributed 7 // except according to those terms. 8 9 use glib::translate::*; 10 use glib_sys; 11 use gst_sys; 12 use PromiseResult; 13 use Structure; 14 use StructureRef; 15 16 use std::pin::Pin; 17 use std::task::{Context, Poll}; 18 19 glib_wrapper! { 20 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] 21 pub struct Promise(Shared<gst_sys::GstPromise>); 22 23 match fn { 24 ref => |ptr| gst_sys::gst_mini_object_ref(ptr as *mut _), 25 unref => |ptr| gst_sys::gst_mini_object_unref(ptr as *mut _), 26 get_type => || gst_sys::gst_promise_get_type(), 27 } 28 } 29 30 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] 31 pub enum PromiseError { 32 Interrupted, 33 Expired, 34 Other(PromiseResult), 35 } 36 37 impl Promise { new() -> Promise38 pub fn new() -> Promise { 39 assert_initialized_main_thread!(); 40 unsafe { from_glib_full(gst_sys::gst_promise_new()) } 41 } 42 new_with_change_func<F>(func: F) -> Promise where F: FnOnce(Result<&StructureRef, PromiseError>) + Send + 'static,43 pub fn new_with_change_func<F>(func: F) -> Promise 44 where 45 F: FnOnce(Result<&StructureRef, PromiseError>) + Send + 'static, 46 { 47 let user_data: Box<Option<F>> = Box::new(Some(func)); 48 49 unsafe extern "C" fn trampoline< 50 F: FnOnce(Result<&StructureRef, PromiseError>) + Send + 'static, 51 >( 52 promise: *mut gst_sys::GstPromise, 53 user_data: glib_sys::gpointer, 54 ) { 55 lazy_static! { 56 static ref EMPTY: Structure = Structure::new_empty("EMPTY"); 57 } 58 59 let user_data: &mut Option<F> = &mut *(user_data as *mut _); 60 let callback = user_data.take().unwrap(); 61 62 let promise: Promise = from_glib_borrow(promise); 63 64 let res = match promise.wait() { 65 // Return an empty structure if it's None as workaround for 66 // https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1300 67 PromiseResult::Replied => Ok(promise.get_reply().unwrap_or(&EMPTY)), 68 PromiseResult::Interrupted => Err(PromiseError::Interrupted), 69 PromiseResult::Expired => Err(PromiseError::Expired), 70 PromiseResult::Pending => { 71 panic!("Promise resolved but returned Pending"); 72 } 73 err => Err(PromiseError::Other(err)), 74 }; 75 76 callback(res); 77 } 78 79 unsafe extern "C" fn free_user_data< 80 F: FnOnce(Result<&StructureRef, PromiseError>) + Send + 'static, 81 >( 82 user_data: glib_sys::gpointer, 83 ) { 84 let _: Box<Option<F>> = Box::from_raw(user_data as *mut _); 85 } 86 87 unsafe { 88 from_glib_full(gst_sys::gst_promise_new_with_change_func( 89 Some(trampoline::<F>), 90 Box::into_raw(user_data) as *mut _, 91 Some(free_user_data::<F>), 92 )) 93 } 94 } 95 new_future() -> (Self, PromiseFuture)96 pub fn new_future() -> (Self, PromiseFuture) { 97 use futures_channel::oneshot; 98 99 let (sender, receiver) = oneshot::channel(); 100 101 let promise = Self::new_with_change_func(move |res| { 102 let _ = sender.send(res.map(|s| s.to_owned())); 103 }); 104 105 (promise, PromiseFuture(receiver)) 106 } 107 expire(&self)108 pub fn expire(&self) { 109 unsafe { 110 gst_sys::gst_promise_expire(self.to_glib_none().0); 111 } 112 } 113 get_reply(&self) -> Option<&StructureRef>114 pub fn get_reply(&self) -> Option<&StructureRef> { 115 unsafe { 116 let s = gst_sys::gst_promise_get_reply(self.to_glib_none().0); 117 if s.is_null() { 118 None 119 } else { 120 Some(StructureRef::from_glib_borrow(s)) 121 } 122 } 123 } 124 interrupt(&self)125 pub fn interrupt(&self) { 126 unsafe { 127 gst_sys::gst_promise_interrupt(self.to_glib_none().0); 128 } 129 } 130 reply(&self, s: Structure)131 pub fn reply(&self, s: Structure) { 132 unsafe { 133 gst_sys::gst_promise_reply(self.to_glib_none().0, s.into_ptr()); 134 } 135 } 136 wait(&self) -> PromiseResult137 pub fn wait(&self) -> PromiseResult { 138 unsafe { from_glib(gst_sys::gst_promise_wait(self.to_glib_none().0)) } 139 } 140 } 141 142 impl Default for Promise { default() -> Self143 fn default() -> Self { 144 Self::new() 145 } 146 } 147 148 unsafe impl Send for Promise {} 149 unsafe impl Sync for Promise {} 150 151 #[derive(Debug)] 152 pub struct PromiseFuture(futures_channel::oneshot::Receiver<Result<Structure, PromiseError>>); 153 154 impl std::future::Future for PromiseFuture { 155 type Output = Result<Structure, PromiseError>; 156 poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output>157 fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> { 158 match Pin::new(&mut self.0).poll(context) { 159 Poll::Ready(Err(_)) => panic!("Sender dropped before callback was called"), 160 Poll::Ready(Ok(res)) => Poll::Ready(res), 161 Poll::Pending => Poll::Pending, 162 } 163 } 164 } 165 166 #[cfg(test)] 167 mod tests { 168 use super::*; 169 use std::sync::mpsc::channel; 170 use std::thread; 171 172 #[test] test_change_func()173 fn test_change_func() { 174 ::init().unwrap(); 175 176 let (sender, receiver) = channel(); 177 let promise = Promise::new_with_change_func(move |res| { 178 sender.send(res.map(|s| s.to_owned())).unwrap(); 179 }); 180 181 thread::spawn(move || { 182 promise.reply(crate::Structure::new("foo/bar", &[])); 183 }); 184 185 let res = receiver.recv().unwrap(); 186 let res = res.expect("promise failed"); 187 assert_eq!(res.get_name(), "foo/bar"); 188 } 189 } 190