1 // Take a look at the license at the top of the repository in the LICENSE file.
2 
3 use futures_channel::mpsc::{self, UnboundedReceiver};
4 use futures_core::Stream;
5 use futures_util::StreamExt;
6 use glib::ffi::{gboolean, gpointer};
7 use glib::prelude::*;
8 use glib::source::{Continue, Priority, SourceId};
9 use glib::translate::*;
10 use std::cell::RefCell;
11 use std::future;
12 use std::mem::transmute;
13 use std::pin::Pin;
14 use std::task::{Context, Poll};
15 
16 use crate::Bus;
17 use crate::BusSyncReply;
18 use crate::Message;
19 use crate::MessageType;
20 
trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> gboolean21 unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
22     bus: *mut ffi::GstBus,
23     msg: *mut ffi::GstMessage,
24     func: gpointer,
25 ) -> gboolean {
26     let func: &RefCell<F> = &*(func as *const RefCell<F>);
27     (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
28 }
29 
destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( ptr: gpointer, )30 unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
31     ptr: gpointer,
32 ) {
33     Box::<RefCell<F>>::from_raw(ptr as *mut _);
34 }
35 
into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer36 fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
37     #[allow(clippy::type_complexity)]
38     let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
39     Box::into_raw(func) as gpointer
40 }
41 
trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> ffi::GstBusSyncReply42 unsafe extern "C" fn trampoline_sync<
43     F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
44 >(
45     bus: *mut ffi::GstBus,
46     msg: *mut ffi::GstMessage,
47     func: gpointer,
48 ) -> ffi::GstBusSyncReply {
49     let f: &F = &*(func as *const F);
50     let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
51 
52     if res == ffi::GST_BUS_DROP {
53         ffi::gst_mini_object_unref(msg as *mut _);
54     }
55 
56     res
57 }
58 
destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, )59 unsafe extern "C" fn destroy_closure_sync<
60     F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
61 >(
62     ptr: gpointer,
63 ) {
64     Box::<F>::from_raw(ptr as *mut _);
65 }
66 
into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer67 fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
68     func: F,
69 ) -> gpointer {
70     let func: Box<F> = Box::new(func);
71     Box::into_raw(func) as gpointer
72 }
73 
74 impl Bus {
75     #[doc(alias = "gst_bus_add_signal_watch_full")]
add_signal_watch_full(&self, priority: Priority)76     pub fn add_signal_watch_full(&self, priority: Priority) {
77         unsafe {
78             ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
79         }
80     }
81 
82     #[doc(alias = "gst_bus_create_watch")]
create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,83     pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
84     where
85         F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
86     {
87         skip_assert_initialized!();
88         unsafe {
89             let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
90             glib::ffi::g_source_set_callback(
91                 source,
92                 Some(transmute::<
93                     _,
94                     unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
95                 >(trampoline_watch::<F> as *const ())),
96                 into_raw_watch(func),
97                 Some(destroy_closure_watch::<F>),
98             );
99             glib::ffi::g_source_set_priority(source, priority.into_glib());
100 
101             if let Some(name) = name {
102                 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
103             }
104 
105             from_glib_full(source)
106         }
107     }
108 
109     #[doc(alias = "gst_bus_add_watch_full")]
add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,110     pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
111     where
112         F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
113     {
114         unsafe {
115             let res = ffi::gst_bus_add_watch_full(
116                 self.to_glib_none().0,
117                 glib::ffi::G_PRIORITY_DEFAULT,
118                 Some(trampoline_watch::<F>),
119                 into_raw_watch(func),
120                 Some(destroy_closure_watch::<F>),
121             );
122 
123             if res == 0 {
124                 Err(glib::bool_error!("Bus already has a watch"))
125             } else {
126                 Ok(from_glib(res))
127             }
128         }
129     }
130 
add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + 'static,131     pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
132     where
133         F: FnMut(&Bus, &Message) -> Continue + 'static,
134     {
135         unsafe {
136             assert!(glib::MainContext::ref_thread_default().is_owner());
137 
138             let res = ffi::gst_bus_add_watch_full(
139                 self.to_glib_none().0,
140                 glib::ffi::G_PRIORITY_DEFAULT,
141                 Some(trampoline_watch::<F>),
142                 into_raw_watch(func),
143                 Some(destroy_closure_watch::<F>),
144             );
145 
146             if res == 0 {
147                 Err(glib::bool_error!("Bus already has a watch"))
148             } else {
149                 Ok(from_glib(res))
150             }
151         }
152     }
153 
154     #[doc(alias = "gst_bus_set_sync_handler")]
set_sync_handler<F>(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,155     pub fn set_sync_handler<F>(&self, func: F)
156     where
157         F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
158     {
159         use once_cell::sync::Lazy;
160         static SET_ONCE_QUARK: Lazy<glib::Quark> =
161             Lazy::new(|| glib::Quark::from_string("gstreamer-rs-sync-handler"));
162 
163         unsafe {
164             let bus = self.to_glib_none().0;
165 
166             // This is not thread-safe before 1.16.3, see
167             // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
168             if crate::version() < (1, 16, 3, 0) {
169                 if !glib::gobject_ffi::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.into_glib())
170                     .is_null()
171                 {
172                     panic!("Bus sync handler can only be set once");
173                 }
174 
175                 glib::gobject_ffi::g_object_set_qdata(
176                     bus as *mut _,
177                     SET_ONCE_QUARK.into_glib(),
178                     1 as *mut _,
179                 );
180             }
181 
182             ffi::gst_bus_set_sync_handler(
183                 bus,
184                 Some(trampoline_sync::<F>),
185                 into_raw_sync(func),
186                 Some(destroy_closure_sync::<F>),
187             )
188         }
189     }
190 
unset_sync_handler(&self)191     pub fn unset_sync_handler(&self) {
192         // This is not thread-safe before 1.16.3, see
193         // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
194         if crate::version() < (1, 16, 3, 0) {
195             return;
196         }
197 
198         unsafe {
199             use std::ptr;
200 
201             ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
202         }
203     }
204 
iter(&self) -> Iter205     pub fn iter(&self) -> Iter {
206         self.iter_timed(Some(crate::ClockTime::ZERO))
207     }
208 
iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter209     pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
210         Iter {
211             bus: self,
212             timeout: timeout.into(),
213         }
214     }
215 
iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a216     pub fn iter_filtered<'a>(
217         &'a self,
218         msg_types: &'a [MessageType],
219     ) -> impl Iterator<Item = Message> + 'a {
220         self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
221     }
222 
iter_timed_filtered<'a>( &'a self, timeout: impl Into<Option<crate::ClockTime>>, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a223     pub fn iter_timed_filtered<'a>(
224         &'a self,
225         timeout: impl Into<Option<crate::ClockTime>>,
226         msg_types: &'a [MessageType],
227     ) -> impl Iterator<Item = Message> + 'a {
228         self.iter_timed(timeout)
229             .filter(move |msg| msg_types.contains(&msg.type_()))
230     }
231 
timed_pop_filtered( &self, timeout: impl Into<Option<crate::ClockTime>> + Clone, msg_types: &[MessageType], ) -> Option<Message>232     pub fn timed_pop_filtered(
233         &self,
234         timeout: impl Into<Option<crate::ClockTime>> + Clone,
235         msg_types: &[MessageType],
236     ) -> Option<Message> {
237         loop {
238             let msg = self.timed_pop(timeout.clone())?;
239             if msg_types.contains(&msg.type_()) {
240                 return Some(msg);
241             }
242         }
243     }
244 
pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message>245     pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
246         loop {
247             let msg = self.pop()?;
248             if msg_types.contains(&msg.type_()) {
249                 return Some(msg);
250             }
251         }
252     }
253 
stream(&self) -> BusStream254     pub fn stream(&self) -> BusStream {
255         BusStream::new(self)
256     }
257 
stream_filtered<'a>( &self, message_types: &'a [MessageType], ) -> impl Stream<Item = Message> + Unpin + Send + 'a258     pub fn stream_filtered<'a>(
259         &self,
260         message_types: &'a [MessageType],
261     ) -> impl Stream<Item = Message> + Unpin + Send + 'a {
262         self.stream().filter(move |message| {
263             let message_type = message.type_();
264 
265             future::ready(message_types.contains(&message_type))
266         })
267     }
268 }
269 
270 #[derive(Debug)]
271 pub struct Iter<'a> {
272     bus: &'a Bus,
273     timeout: Option<crate::ClockTime>,
274 }
275 
276 impl<'a> Iterator for Iter<'a> {
277     type Item = Message;
278 
next(&mut self) -> Option<Message>279     fn next(&mut self) -> Option<Message> {
280         self.bus.timed_pop(self.timeout)
281     }
282 }
283 
284 #[derive(Debug)]
285 pub struct BusStream {
286     bus: glib::WeakRef<Bus>,
287     receiver: UnboundedReceiver<Message>,
288 }
289 
290 impl BusStream {
new(bus: &Bus) -> Self291     fn new(bus: &Bus) -> Self {
292         skip_assert_initialized!();
293 
294         let (sender, receiver) = mpsc::unbounded();
295 
296         bus.set_sync_handler(move |_, message| {
297             let _ = sender.unbounded_send(message.to_owned());
298 
299             BusSyncReply::Drop
300         });
301 
302         Self {
303             bus: bus.downgrade(),
304             receiver,
305         }
306     }
307 }
308 
309 impl Drop for BusStream {
drop(&mut self)310     fn drop(&mut self) {
311         if let Some(bus) = self.bus.upgrade() {
312             bus.unset_sync_handler();
313         }
314     }
315 }
316 
317 impl Stream for BusStream {
318     type Item = Message;
319 
poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>>320     fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
321         self.receiver.poll_next_unpin(context)
322     }
323 }
324 
325 #[cfg(test)]
326 mod tests {
327     use super::*;
328     use std::sync::{Arc, Mutex};
329 
330     #[test]
test_sync_handler()331     fn test_sync_handler() {
332         crate::init().unwrap();
333 
334         let bus = Bus::new();
335         let msgs = Arc::new(Mutex::new(Vec::new()));
336         let msgs_clone = msgs.clone();
337         bus.set_sync_handler(move |_, msg| {
338             msgs_clone.lock().unwrap().push(msg.clone());
339             BusSyncReply::Pass
340         });
341 
342         bus.post(&crate::message::Eos::new()).unwrap();
343 
344         let msgs = msgs.lock().unwrap();
345         assert_eq!(msgs.len(), 1);
346         match msgs[0].view() {
347             crate::MessageView::Eos(_) => (),
348             _ => unreachable!(),
349         }
350     }
351 
352     #[test]
test_bus_stream()353     fn test_bus_stream() {
354         crate::init().unwrap();
355 
356         let bus = Bus::new();
357         let bus_stream = bus.stream();
358 
359         let eos_message = crate::message::Eos::new();
360         bus.post(&eos_message).unwrap();
361 
362         let bus_future = bus_stream.into_future();
363         let (message, _) = futures_executor::block_on(bus_future);
364 
365         match message.unwrap().view() {
366             crate::MessageView::Eos(_) => (),
367             _ => unreachable!(),
368         }
369     }
370 }
371